Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,18 @@ public class Options {
* {@link Builder#socketSoLinger(int) socketSoLinger}.
*/
public static final String PROP_SOCKET_SO_LINGER = PFX + "socket.so.linger";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#receiveBufferSize(int) receiveBufferSize}.
* MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK
*/
public static final String PROP_SOCKET_RECEIVE_BUFFER_SIZE = PFX + "socket.receive.buffer.size";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#sendBufferSize(int) sendBufferSize}.
* MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK
*/
public static final String PROP_SOCKET_SEND_BUFFER_SIZE = PFX + "socket.send.buffer.size";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#reconnectBufferSize(long) reconnectBufferSize}.
Expand Down Expand Up @@ -654,6 +666,8 @@ public class Options {
private final int socketReadTimeoutMillis;
private final Duration socketWriteTimeout;
private final int socketSoLinger;
private final int receiveBufferSize;
private final int sendBufferSize;
private final Duration pingInterval;
private final Duration requestCleanupInterval;
private final int maxPingsOut;
Expand Down Expand Up @@ -797,6 +811,8 @@ public static class Builder {
private int socketReadTimeoutMillis = 0;
private Duration socketWriteTimeout = DEFAULT_SOCKET_WRITE_TIMEOUT;
private int socketSoLinger = -1;
private int receiveBufferSize = -1;
private int sendBufferSize = -1;
private Duration pingInterval = DEFAULT_PING_INTERVAL;
private Duration requestCleanupInterval = DEFAULT_REQUEST_CLEANUP_INTERVAL;
private int maxPingsOut = DEFAULT_MAX_PINGS_OUT;
Expand Down Expand Up @@ -942,6 +958,8 @@ public Builder properties(Properties props) {
intProperty(props, PROP_SOCKET_READ_TIMEOUT_MS, -1, i -> this.socketReadTimeoutMillis = i);
durationProperty(props, PROP_SOCKET_WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT, d -> this.socketWriteTimeout = d);
intProperty(props, PROP_SOCKET_SO_LINGER, -1, i -> socketSoLinger = i);
intProperty(props, PROP_SOCKET_RECEIVE_BUFFER_SIZE, -1, i -> this.receiveBufferSize = i);
intProperty(props, PROP_SOCKET_SEND_BUFFER_SIZE, -1, i -> this.sendBufferSize = i);

intGtEqZeroProperty(props, PROP_MAX_CONTROL_LINE, DEFAULT_MAX_CONTROL_LINE, i -> this.maxControlLine = i);
durationProperty(props, PROP_PING_INTERVAL, DEFAULT_PING_INTERVAL, d -> this.pingInterval = d);
Expand Down Expand Up @@ -1424,6 +1442,30 @@ public Builder socketSoLinger(int socketSoLinger) {
return this;
}

/**
* Set the value of the socket SO_RCVBUF property in bytes
* The SO_RCVBUF option is used by the platform's networking code as a hint for the size to set the underlying network I/O buffers.
* MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK
* @param receiveBufferSize the size in bytes
* @return the Builder for chaining
*/
public Builder receiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
return this;
}

/**
* Set the value of the socket SO_SNDBUF property in bytes
* The SO_SNDBUF option is used by the platform's networking code as a hint for the size to set the underlying network I/O buffers.
* MAY OVERRIDE THE UNDERLYING JAVA SOCKET IMPLEMENTATION - USE AT YOUR OWN RISK
* @param sendBufferSize the size in bytes
* @return the Builder for chaining
*/
public Builder sendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
return this;
}

/**
* Set the interval between attempts to pings the server. These pings are automated,
* and capped by {@link #maxPingsOut(int) maxPingsOut()}. As of 2.4.4 the library
Expand Down Expand Up @@ -1964,10 +2006,18 @@ else if (useDefaultTls) {
throw new IllegalArgumentException("Socket Write Timeout cannot be less than " + MINIMUM_SOCKET_WRITE_TIMEOUT_NANOS + " nanoseconds.");
}

if (socketSoLinger < 0) {
if (socketSoLinger < 1) {
socketSoLinger = -1;
}

if (receiveBufferSize < 1) {
receiveBufferSize = -1;
}

if (sendBufferSize < 1) {
sendBufferSize = -1;
}

if (errorListener == null) {
errorListener = new ErrorListenerLoggerImpl();
}
Expand Down Expand Up @@ -2016,6 +2066,8 @@ public Builder(Options o) {
this.socketReadTimeoutMillis = o.socketReadTimeoutMillis;
this.socketWriteTimeout = o.socketWriteTimeout;
this.socketSoLinger = o.socketSoLinger;
this.receiveBufferSize = o.receiveBufferSize;
this.sendBufferSize = o.sendBufferSize;
this.pingInterval = o.pingInterval;
this.requestCleanupInterval = o.requestCleanupInterval;
this.maxPingsOut = o.maxPingsOut;
Expand Down Expand Up @@ -2086,6 +2138,8 @@ private Options(Builder b) {
this.socketReadTimeoutMillis = b.socketReadTimeoutMillis;
this.socketWriteTimeout = b.socketWriteTimeout;
this.socketSoLinger = b.socketSoLinger;
this.receiveBufferSize = b.receiveBufferSize;
this.sendBufferSize = b.sendBufferSize;
this.pingInterval = b.pingInterval;
this.requestCleanupInterval = b.requestCleanupInterval;
this.maxPingsOut = b.maxPingsOut;
Expand Down Expand Up @@ -2484,6 +2538,20 @@ public int getSocketSoLinger() {
return socketSoLinger;
}

/**
* @return the number of bytes to set the for the SO_RCVBUF property on the socket
*/
public int getReceiveBufferSize() {
return receiveBufferSize;
}

/**
* @return the number of bytes to set the for the SO_SNDBUF property on the socket
*/
public int getSendBufferSize() {
return sendBufferSize;
}

/**
* @return the pingInterval, see {@link Builder#pingInterval(Duration) pingInterval()} in the builder doc
*/
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/io/nats/client/impl/SocketDataPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@ public class SocketDataPort implements DataPort {
protected Socket socket;
protected boolean isSecure = false;
protected int soLinger;
protected int receiveBufferSize;
protected int sendBufferSize;

protected InputStream in;
protected OutputStream out;

@Override
public void afterConstruct(Options options) {
soLinger = options.getSocketSoLinger();
receiveBufferSize = options.getReceiveBufferSize();
sendBufferSize = options.getSendBufferSize();
}

@Override
Expand Down Expand Up @@ -82,12 +86,20 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti
socket = createSocket(options);
socket.connect(new InetSocketAddress(host, port), (int) timeout);
}
if (options.getSocketReadTimeoutMillis() > 0) {
socket.setSoTimeout(options.getSocketReadTimeoutMillis());
}

if (soLinger > -1) {
if (soLinger > 0) {
socket.setSoLinger(true, soLinger);
}
if (options.getSocketReadTimeoutMillis() > 0) {
socket.setSoTimeout(options.getSocketReadTimeoutMillis());

if (receiveBufferSize > 0) {
socket.setReceiveBufferSize(receiveBufferSize);
}

if (sendBufferSize > 0) {
socket.setSendBufferSize(sendBufferSize);
}

if (isWebsocketScheme(nuri.getScheme())) {
Expand Down
Loading