Skip to content

Commit

Permalink
Merge 9cbbf4d into f589efe
Browse files Browse the repository at this point in the history
  • Loading branch information
fredoboulo committed Mar 19, 2019
2 parents f589efe + 9cbbf4d commit 5d220a9
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 66 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/zeromq/proto/ZNeedle.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public String getString()
}

// Put a collection of strings to the frame
public void put(Collection<String> elements)
public void putList(Collection<String> elements)
{
if (elements == null) {
putNumber1(0);
Expand All @@ -192,7 +192,7 @@ public List<String> getList()
}

// Put a map of strings to the frame
public void put(Map<String, String> map)
public void putMap(Map<String, String> map)
{
if (map == null) {
putNumber1(0);
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/zmq/Msg.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.nio.channels.SocketChannel;

import zmq.io.Metadata;
import zmq.util.Utils;
import zmq.util.Wire;

public class Msg
Expand Down Expand Up @@ -57,6 +58,20 @@ public Msg put(ByteBuffer src, int off, int len)
return this;
}

@Override
public Msg putShortString(String data)
{
if (data == null) {
return this;
}
int length = data.length();
Utils.checkArgument(length < 256, "String must be strictly smaller than 256 characters");
out.write((byte) length);
out.write(data.getBytes(ZMQ.CHARSET), 0, length);
setWriteIndex(getWriteIndex() + length + 1);
return this;
}

@Override
public void setFlags(int flags)
{
Expand Down Expand Up @@ -389,4 +404,23 @@ public void transfer(ByteBuffer destination, int srcOffset, int srcLength)
destination.put(buf);
buf.limit(limit).position(position);
}

/**
* Puts a string into the message, prefixed with its length.
* Users shall size the message by adding 1 to the length of the string:
* It needs to be able to accommodate (data.length+1) more bytes.
*
* @param data a string shorter than 256 characters. If null, defaults to a no-op.
* @return the same message.
*/
public Msg putShortString(String data)
{
if (data == null) {
return this;
}
ByteBuffer dup = buf.duplicate();
dup.position(writeIndex);
writeIndex += Wire.putShortString(dup, data);
return this;
}
}
20 changes: 0 additions & 20 deletions src/main/java/zmq/io/Msgs.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package zmq.io;

import zmq.Msg;
import zmq.ZMQ;

public class Msgs
{
Expand Down Expand Up @@ -38,23 +37,4 @@ public static boolean startsWith(Msg msg, String data, boolean includeLength)
}
return comparison;
}

/**
* Puts a string into the message, prefixed with its length.
* Users shall size the message by adding 1 to the length of the string.
*
* @param msg the message to fill. It needs to be able to accommodate (data.length+1) bytes more.
* @param data a string shorter than 256 characters.
* @return the same message.
*/
public static Msg put(Msg msg, String data)
{
final int length = data.length();
assert (length < 256);

msg.put((byte) length);
msg.put(data.getBytes(ZMQ.CHARSET));

return msg;
}
}
4 changes: 2 additions & 2 deletions src/main/java/zmq/io/StreamEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ private Msg producePingMessage()

Msg msg = new Msg(7 + heartbeatContext.length);
msg.setFlags(Msg.COMMAND);
Msgs.put(msg, "PING");
msg.putShortString("PING");
Wire.putUInt16(msg, options.heartbeatTtl);
msg.put(heartbeatContext);

Expand All @@ -1185,7 +1185,7 @@ private Msg producePongMessage(byte[] pingContext)

Msg msg = new Msg(5 + pingContext.length);
msg.setFlags(Msg.COMMAND);
Msgs.put(msg, "PONG");
msg.putShortString("PONG");
msg.put(pingContext);

msg = mechanism.encode(msg);
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/zmq/io/mechanism/Mechanism.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,6 @@ protected final int receiveAndProcessZapReply()
return parseMetadata(msgs.get(6), 0, true);
}

protected final void appendData(Msg msg, String data)
{
Msgs.put(msg, data);
}

public void destroy()
{
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/zmq/io/mechanism/NullMechanism.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ public int nextHandshakeCommand(Msg msg)
}

if (zapReplyReceived && !OK.equals(statusCode)) {
appendData(msg, ERROR);
appendData(msg, statusCode);
msg.putShortString(ERROR);
msg.putShortString(statusCode);

errorCommandSent = true;
return 0;
}

// Add mechanism string
appendData(msg, READY);
msg.putShortString(READY);

// Add socket type property
String socketType = socketType(options.type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Msg encode(Msg msg)
assert (rc == 0);

Msg encoded = new Msg(16 + mlen - Curve.Size.BOXZERO.bytes());
appendData(encoded, "MESSAGE");
encoded.putShortString("MESSAGE");
encoded.put(messageNonce, 16, 8);
encoded.put(messageBox, Curve.Size.BOXZERO.bytes(), mlen - Curve.Size.BOXZERO.bytes());

Expand Down Expand Up @@ -252,7 +252,7 @@ private int produceHello(Msg msg)
if (rc != 0) {
return -1;
}
appendData(msg, "HELLO");
msg.putShortString("HELLO");
// CurveZMQ major and minor version numbers
msg.put(1);
msg.put(0);
Expand Down Expand Up @@ -359,7 +359,7 @@ private int produceInitiate(Msg msg)
return -1;
}

appendData(msg, "INITIATE");
msg.putShortString("INITIATE");
// Cookie provided by the server in the WELCOME command
msg.put(cnCookie);
// Short nonce, prefixed by "CurveZMQINITIATE"
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/zmq/io/mechanism/curve/CurveServerMechanism.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public Msg encode(Msg msg)
assert (rc == 0);

Msg encoded = new Msg(16 + mlen - Curve.Size.BOXZERO.bytes());
appendData(encoded, "MESSAGE");
encoded.putShortString("MESSAGE");
encoded.put(messageNonce, 16, 8);
encoded.put(messageBox, Curve.Size.BOXZERO.bytes(), mlen - Curve.Size.BOXZERO.bytes());

Expand Down Expand Up @@ -337,7 +337,7 @@ private int produceWelcome(Msg msg)
if (rc == -1) {
return -1;
}
appendData(msg, "WELCOME");
msg.putShortString("WELCOME");
msg.put(welcomeNonce, 8, 16);
msg.put(welcomeCiphertext, Curve.Size.BOXZERO.bytes(), 144);

Expand Down Expand Up @@ -474,7 +474,7 @@ private int produceReady(Msg msg)
int rc = cryptoBox.afternm(readyBox, readyPlaintext, mlen, readyNonce, cnPrecom);
assert (rc == 0);

appendData(msg, "READY");
msg.putShortString("READY");
// Short nonce, prefixed by "CurveZMQREADY---"
msg.put(readyNonce, 16, 8);
// Box [metadata](S'->C')
Expand All @@ -490,8 +490,8 @@ private int produceError(Msg msg)
{
assert (statusCode != null && statusCode.length() == 3);

appendData(msg, "ERROR");
appendData(msg, statusCode);
msg.putShortString("ERROR");
msg.putShortString(statusCode);

return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ private int produceHello(Msg msg)
String plainPassword = options.plainPassword;
assert (plainPassword.length() < 256);

appendData(msg, "HELLO");
appendData(msg, plainUsername);
appendData(msg, plainPassword);
msg.putShortString("HELLO");
msg.putShortString(plainUsername);
msg.putShortString(plainPassword);

return 0;
}
Expand All @@ -127,7 +127,7 @@ private int processWelcome(Msg msg)
private int produceInitiate(Msg msg)
{
// Add mechanism string
appendData(msg, "INITIATE");
msg.putShortString("INITIATE");

// Add socket type property
String socketType = socketType(options.type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ else if (rc == ZError.EAGAIN) {

private int produceWelcome(Msg msg)
{
appendData(msg, "WELCOME");
msg.putShortString("WELCOME");
return 0;
}

Expand All @@ -197,7 +197,7 @@ private int produceInitiate(Msg msg)
private int produceReady(Msg msg)
{
// Add command name
appendData(msg, "READY");
msg.putShortString("READY");

// Add socket type property
String socketType = socketType(options.type);
Expand All @@ -215,8 +215,8 @@ private int produceError(Msg msg)
{
assert (statusCode != null && statusCode.length() == 3);

appendData(msg, "ERROR");
appendData(msg, statusCode);
msg.putShortString("ERROR");
msg.putShortString(statusCode);

return 0;
}
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/zmq/util/Wire.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,23 @@ public static long getUInt64(Msg msg, int offset)
}

// strings
public static void putShortString(ByteBuffer buf, String value)
public static int putShortString(ByteBuffer buf, String value)
{
putShortString(ZMQ.CHARSET, buf, value);
return putShortString(ZMQ.CHARSET, buf, value);
}

public static String getShortString(ByteBuffer buf, int offset)
{
return getShortString(ZMQ.CHARSET, buf, offset);
}

public static void putShortString(Charset charset, ByteBuffer buf, String value)
public static int putShortString(Charset charset, ByteBuffer buf, String value)
{
int length = value.length();
Utils.checkArgument(length < 256, "String must be strictly smaller than 256 characters");
putUInt8(buf, length);
buf.put(value.getBytes(charset));
return length + 1;
}

public static String getShortString(Charset charset, ByteBuffer buf, int offset)
Expand All @@ -173,22 +174,23 @@ public static String getShortString(Charset charset, ByteBuffer buf, int offset)
return extractString(charset, buf, offset, length, 1);
}

public static void putLongString(ByteBuffer buf, String value)
public static int putLongString(ByteBuffer buf, String value)
{
putLongString(ZMQ.CHARSET, buf, value);
return putLongString(ZMQ.CHARSET, buf, value);
}

public static String getLongString(ByteBuffer buf, int offset)
{
return getLongString(ZMQ.CHARSET, buf, offset);
}

public static void putLongString(Charset charset, ByteBuffer buf, String value)
public static int putLongString(Charset charset, ByteBuffer buf, String value)
{
int length = value.length();
Utils.checkArgument(length < 0x7fffffff, "String must be smaller than 2^31-1 characters");
Wire.putUInt32(buf, length);
buf.put(value.getBytes(charset));
return length + 4;
}

public static String getLongString(Charset charset, ByteBuffer buf, int offset)
Expand Down
20 changes: 10 additions & 10 deletions src/test/java/org/zeromq/proto/ZNeedleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ public void testList()
ZFrame frame = new ZFrame(new byte[26]);
ZNeedle needle = new ZNeedle(frame);

needle.put(Arrays.asList("1", "2", "34", "567"));
needle.put(Arrays.asList("864", "43", "9", "0"));
needle.putList(Arrays.asList("1", "2", "34", "567"));
needle.putList(Arrays.asList("864", "43", "9", "0"));

needle = new ZNeedle(frame);
assertThat(needle.getList(), is(Arrays.asList("1", "2", "34", "567")));
Expand All @@ -150,7 +150,7 @@ public void testNullList()
ZFrame frame = new ZFrame(new byte[1]);
ZNeedle needle = new ZNeedle(frame);

needle.put((List<String>) null);
needle.putList((List<String>) null);

needle = new ZNeedle(frame);
assertThat(needle.getList(), is(Collections.emptyList()));
Expand All @@ -162,7 +162,7 @@ public void testEmptyList()
ZFrame frame = new ZFrame(new byte[1]);
ZNeedle needle = new ZNeedle(frame);

needle.put(Collections.emptyList());
needle.putList(Collections.emptyList());

needle = new ZNeedle(frame);
assertThat(needle.getList(), is(Collections.emptyList()));
Expand All @@ -178,8 +178,8 @@ public void testMap()
map.put("key", "value");
map.put("1", "2");
map.put("34", "567");
needle.put(map);
needle.put(map);
needle.putMap(map);
needle.putMap(map);

needle = new ZNeedle(frame);
assertThat(needle.getMap(), is(map));
Expand All @@ -192,7 +192,7 @@ public void testNullMap()
ZFrame frame = new ZFrame(new byte[1]);
ZNeedle needle = new ZNeedle(frame);

needle.put((Map<String, String>) null);
needle.putMap((Map<String, String>) null);

needle = new ZNeedle(frame);
assertThat(needle.getMap(), is(Collections.emptyMap()));
Expand All @@ -204,7 +204,7 @@ public void testEmptyMap()
ZFrame frame = new ZFrame(new byte[1]);
ZNeedle needle = new ZNeedle(frame);

needle.put(Collections.emptyMap());
needle.putMap(Collections.emptyMap());

needle = new ZNeedle(frame);
assertThat(needle.getMap(), is(Collections.emptyMap()));
Expand All @@ -218,7 +218,7 @@ public void testMapIncorrectKey()

Map<String, String> map = new HashMap<>();
map.put("ke=", "value");
needle.put(map);
needle.putMap(map);
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -229,6 +229,6 @@ public void testMapIncorrectValue()

Map<String, String> map = new HashMap<>();
map.put("key", "=alue");
needle.put(map);
needle.putMap(map);
}
}

0 comments on commit 5d220a9

Please sign in to comment.