Skip to content

Commit

Permalink
Merge 0f319ca into 184b6b7
Browse files Browse the repository at this point in the history
  • Loading branch information
sappo committed Mar 8, 2019
2 parents 184b6b7 + 0f319ca commit f0ce1c7
Show file tree
Hide file tree
Showing 2 changed files with 423 additions and 13 deletions.
359 changes: 359 additions & 0 deletions src/main/java/org/zeromq/ZMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import zmq.Ctx;
Expand Down Expand Up @@ -818,6 +819,121 @@ public static class Socket implements Closeable
private final SocketBase base;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

// Network data encoding macros

// Put a 1-byte number to the frame
private static void putNumber1(ByteBuffer needle, int value)
{
needle.put((byte) value);
}

// Get a 1-byte number to the frame
// then make it unsigned
private static byte getNumber1(ByteBuffer needle)
{
int value = needle.get();
if (value < 0) {
value = (0xff) & value;
}
return (byte) value;
}

// Put a 2-byte number to the frame
private static void putNumber2(ByteBuffer needle, int value)
{
needle.putShort((short) value);
}

// Get a 2-byte number to the frame
private static int getNumber2(ByteBuffer needle)
{
int value = needle.getShort();
if (value < 0) {
value = (0xffff) & value;
}
return value;
}

// Put a 4-byte number to the frame
private static void putNumber4(ByteBuffer needle, long value)
{
needle.putInt((int) value);
}

// Get a 4-byte number to the frame
// then make it unsigned
private static long getNumber4(ByteBuffer needle)
{
long value = needle.getInt();
if (value < 0) {
value = (0xffffffff) & value;
}
return value;
}

// Put a 8-byte number to the frame
private static void putNumber8(ByteBuffer needle, long value)
{
needle.putLong(value);
}

// Get a 8-byte number to the frame
private static long getNumber8(ByteBuffer needle)
{
return needle.getLong();
}

// Put a block to the frame
private static void putBlock(ByteBuffer needle, byte[] value, int size)
{
needle.put(value, 0, size);
}

// Get a block from the frame
private static byte[] getBlock(ByteBuffer needle, int size)
{
byte[] value = new byte[size];
needle.get(value);

return value;
}

// Put a string to the frame
private static void putString(ByteBuffer needle, String value)
{
byte[] bytes = value.getBytes(ZMQ.CHARSET);
needle.put((byte) bytes.length);
needle.put(bytes);
}

// Get a string from the frame
private static String getString(ByteBuffer needle)
{
int size = getNumber1(needle);
byte[] value = new byte[size];
needle.get(value);

return new String(value, ZMQ.CHARSET);
}

// Put a string to the frame
private static void putLongString(ByteBuffer needle, String value)
{
byte[] bytes = value.getBytes(ZMQ.CHARSET);
needle.putInt(bytes.length);
needle.put(bytes);
}

// Get a string from the frame
private static String getLongString(ByteBuffer needle)
{
long size = getNumber4(needle);
byte[] value = new byte[(int) size];
needle.get(value);

return new String(value, ZMQ.CHARSET);
}

/**
* Class constructor.
*
Expand Down Expand Up @@ -3207,6 +3323,168 @@ public boolean send(byte[] data, int flags)
return false;
}

private static final int ZSOCK_BSEND_MAX_FRAMES = 32; // Arbitrary limit, for now

/**
* Send a binary encoded 'picture' message to the socket (or actor). This
* method is similar to {@link #send(byte[])}, except the arguments are encoded in a
* binary format that is compatible with zproto, and is designed to reduce
* memory allocations.
*
* @param picture The picture argument is a string that defines the
* type of each argument. Supports these argument types:
* <table>
* <tr><th align="left">pattern</th><th align="left">java type</th><th align="left">zproto type</th></tr>
* <tr><td>1</td><td>int</td><td>type = "number" size = "1"</td></tr>
* <tr><td>2</td><td>int</td><td>type = "number" size = "2"</td></tr>
* <tr><td>4</td><td>long</td><td>type = "number" size = "3"</td></tr>
* <tr><td>8</td><td>long</td><td>type = "number" size = "4"</td></tr>
* <tr><td>s</td><td>String, 0-255 chars</td><td>type = "string"</td></tr>
* <tr><td>S</td><td>String, 0-2^32-1 chars</td><td>type = "longstr"</td></tr>
* <tr><td>c</td><td>byte[], 0-2^32-1 bytes</td><td>type = "chunk"</td></tr>
* <tr><td>f</td><td>ZFrame</td><td>type = "frame"</td></tr>
* <tr><td>m</td><td>ZMsg</td><td>type = "msg"</td></tr>
* </table>
* @param args Arguments according to the picture
* @return true when it has been queued on the socket and ØMQ has assumed responsibility for the message.
* This does not indicate that the message has been transmitted to the network.
* @apiNote Does not change or take ownership of any arguments.
*/
public boolean send(String picture, Object... args)
{
// Pass 1: calculate total size of data frame
int frameSize = 0;
ZFrame[] frames = new ZFrame[ZSOCK_BSEND_MAX_FRAMES];
final AtomicInteger nbrFrames = new AtomicInteger(0); // Size of this table

for (int index = 0; index < picture.length(); index++) {
char pattern = picture.charAt(index);
switch (pattern) {
case '1': {
frameSize += 1;
break;
}
case '2': {
frameSize += 2;
break;
}
case '4': {
frameSize += 4;
break;
}
case '8': {
frameSize += 8;
break;
}
case 's': {
String string = (String) args[index];
frameSize += 1 + (string != null ? string.getBytes(ZMQ.CHARSET).length : 0);
break;
}
case 'S': {
String string = (String) args[index];
frameSize += 4 + (string != null ? string.getBytes(ZMQ.CHARSET).length : 0);
break;
}
case 'c': {
byte[] block = (byte[]) args[index];
frameSize += 4 + block.length;
break;
}
case 'f': {
ZFrame frame = (ZFrame) args[index];
if (nbrFrames.get() > ZSOCK_BSEND_MAX_FRAMES) {
throw new ZMQException("Max no of frames exceeded", ZError.EPROTO);
}

frames[nbrFrames.getAndIncrement()] = frame;
break;
}
case 'm': {
if (index != picture.length() - 1) {
throw new ZMQException("'m' (ZMsg) only valid at end of picture", ZError.EPROTO);
}

ZMsg msg = (ZMsg) args[index];
if (msg != null) {
msg.forEach(frame -> {
if (nbrFrames.get() > ZSOCK_BSEND_MAX_FRAMES) {
throw new ZMQException("Max no of frames exceeded", ZError.EPROTO);
}

frames[nbrFrames.getAndIncrement()] = frame;
});
}
else {
frames[nbrFrames.getAndIncrement()] = new ZFrame();
}

break;
}
default:
throw new ZMQException("invalid picture element '" + pattern + "'", ZError.EPROTO);
}
}

// Pass 2: encode data into data frame
ByteBuffer needle = ByteBuffer.allocate(frameSize);
for (int index = 0; index < picture.length(); index++) {
char pattern = picture.charAt(index);
switch (pattern) {
case '1': {
putNumber1(needle, (int) args[index]);
break;
}
case '2': {
putNumber2(needle, (int) args[index]);
break;
}
case '4': {
putNumber4(needle, (long) args[index]);
break;
}
case '8': {
putNumber8(needle, (long) args[index]);
break;
}
case 's': {
putString(needle, (String) args[index]);
break;
}
case 'S': {
putLongString(needle, (String) args[index]);
break;
}
case 'c': {
byte[] block = (byte[]) args[index];
putNumber4(needle, block.length);
putBlock(needle, block, block.length);
break;
}
case 'f':
case 'm':
break;
default:
throw new ZMQException("invalid picture element '" + pattern + "'", ZError.EPROTO);
}
}

// Now send the data frame
needle.flip();
boolean rc = sendByteBuffer(needle, nbrFrames.get() > 0 ? zmq.ZMQ.ZMQ_SNDMORE : 0) > 0;
if (rc) {
// Now send any additional frames
for (int frameNbr = 0; frameNbr < nbrFrames.get(); frameNbr++) {
boolean more = frameNbr < nbrFrames.get() - 1;
boolean success = frames[frameNbr].sendAndDestroy(this, more ? zmq.ZMQ.ZMQ_SNDMORE : 0);
if (!rc) {
break;
}
}
}
return rc;
}

/**
* Queues a message created from data, so it can be sent.
*
Expand Down Expand Up @@ -3349,6 +3627,87 @@ public int recv(byte[] buffer, int offset, int len, int flags)
return -1;
}

// This is the largest size we allow for an incoming longstr or chunk (1M)
private static final int MAX_ALLOC_SIZE = 1024 * 1024;

/**
* Receive a binary encoded 'picture' message from the socket (or actor).
* This method is similar to {@link #recv()}, except the arguments are encoded
* in a binary format that is compatible with zproto, and is designed to
* reduce memory allocations.
*
* @param picture The picture argument is a string that defines
* the type of each argument. See {@link #send(String, Object...)}
* for the supported argument types.
* @return Object array that contains the decoded objects according to the picture
**/
public Object[] recv(final String picture)
{
// Get the data frame
final ByteBuffer needle = ByteBuffer.wrap(recv());
if (needle == null) {
return null;
}

Object[] results = new Object[picture.length()];
for (int index = 0; index < picture.length(); index++) {
char pattern = picture.charAt(index);
switch (pattern) {
case '1': {
results[index] = getNumber1(needle);
break;
}
case '2': {
results[index] = getNumber2(needle);
break;
}
case '4': {
results[index] = getNumber4(needle);
break;
}
case '8': {
results[index] = getNumber8(needle);
break;
}
case 's': {
results[index] = getString(needle);
break;
}
case 'S': {
results[index] = getLongString(needle);
break;
}
case 'c': {
int blockSize = (int) getNumber4(needle);
if (blockSize > MAX_ALLOC_SIZE) {
throw new ZMQException(
"block size " + blockSize + "larger than the maximum " + MAX_ALLOC_SIZE,
ZError.EMSGSIZE);
}
results[index] = getBlock(needle, blockSize);
break;
}
case 'f': {
// Get next frame off socket
results[index] = ZFrame.recvFrame(this);
break;
}
case 'm': {
if (index != picture.length() - 1) {
throw new ZMQException("'m' (ZMsg) only valid at end of picture", ZError.EPROTO);
}

// Get zero or more remaining frames
results[index] = ZMsg.recvMsg(this);
break;
}
default:
throw new ZMQException("invalid picture element '" + pattern + "'", ZError.EPROTO);
}
}
return results;
}

/**
* Receives a message into the specified ByteBuffer.
*
Expand Down
Loading

0 comments on commit f0ce1c7

Please sign in to comment.