Skip to content
Permalink
Browse files
fix testSetNetworkTimeoutEnforcement test failure (#1681)
* fix: bug in pgstream for replication

* fix testSetNetworkTimeoutEnforcement test failure.

* fix testAsyncNotifyWithTimeout test failure

* address checkstyle failure

* respect socket timeout setting while the read operation

Co-authored-by: Dave Cramer <davecramer@gmail.com>
  • Loading branch information
hyunkshinft and davecramer committed Jan 28, 2020
1 parent 2ea7311 commit 799e78d578573bd520ff04c6bd72a97c50cb984d
@@ -136,13 +136,16 @@ public boolean hasMessagePending() throws IOException {
}

int soTimeout = getNetworkTimeout();
setNetworkTimeout(1);
connection.setSoTimeout(1);
try {
if (!pgInput.ensureBytes(1, false)) {
return false;
}
available = (pgInput.peek() != -1);
} catch (SocketTimeoutException e) {
return false;
} finally {
setNetworkTimeout(soTimeout);
connection.setSoTimeout(soTimeout);
}

/*
@@ -622,6 +625,7 @@ public void close() throws IOException {

public void setNetworkTimeout(int milliseconds) throws IOException {
connection.setSoTimeout(milliseconds);
pgInput.setTimeoutRequested(milliseconds != 0);
}

public int getNetworkTimeout() throws IOException {
@@ -8,6 +8,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;

/**
* A faster version of BufferedInputStream. Does no synchronisation and allows direct access to the
@@ -49,6 +50,11 @@ public class VisibleBufferedInputStream extends InputStream {
*/
private int endIndex;

/**
* socket timeout has been requested
*/
private boolean timeoutRequested = false;

/**
* Creates a new buffer around the given stream.
*
@@ -104,9 +110,22 @@ public byte readRaw() {
* @throws IOException If reading of the wrapped stream failed.
*/
public boolean ensureBytes(int n) throws IOException {
return ensureBytes(n, true);
}

/**
* Ensures that the buffer contains at least n bytes. This method invalidates the buffer and index
* fields.
*
* @param n The amount of bytes to ensure exists in buffer
* @param block whether or not to block the IO
* @return true if required bytes are available and false if EOF or the parameter block was false and socket timeout occurred.
* @throws IOException If reading of the wrapped stream failed.
*/
public boolean ensureBytes(int n, boolean block) throws IOException {
int required = n - endIndex + index;
while (required > 0) {
if (!readMore(required)) {
if (!readMore(required, block)) {
return false;
}
required = n - endIndex + index;
@@ -121,7 +140,7 @@ public boolean ensureBytes(int n) throws IOException {
* @return True if at least some bytes were read.
* @throws IOException If reading of the wrapped stream failed.
*/
private boolean readMore(int wanted) throws IOException {
private boolean readMore(int wanted, boolean block) throws IOException {
if (endIndex == index) {
index = 0;
endIndex = 0;
@@ -137,7 +156,20 @@ private boolean readMore(int wanted) throws IOException {
}
canFit = buffer.length - endIndex;
}
int read = wrapped.read(buffer, endIndex, canFit);
int read = 0;
try {
read = wrapped.read(buffer, endIndex, canFit);
if (!block && read == 0) {
return false;
}
} catch (SocketTimeoutException e) {
if (!block) {
return false;
}
if (timeoutRequested) {
throw e;
}
}
if (read < 0) {
return false;
}
@@ -211,7 +243,15 @@ public int read(byte[] to, int off, int len) throws IOException {

// then directly from wrapped stream
do {
int r = wrapped.read(to, off, len);
int r;
try {
r = wrapped.read(to, off, len);
} catch (SocketTimeoutException e) {
if (read == 0 && timeoutRequested) {
throw e;
}
return read;
}
if (r <= 0) {
return (read == 0) ? r : read;
}
@@ -287,10 +327,14 @@ public int scanCStringLength() throws IOException {
return pos - index;
}
}
if (!readMore(STRING_SCAN_SPAN)) {
if (!readMore(STRING_SCAN_SPAN, true)) {
throw new EOFException();
}
pos = index;
}
}

public void setTimeoutRequested(boolean timeoutRequested) {
this.timeoutRequested = timeoutRequested;
}
}
@@ -780,9 +780,9 @@ private void setSocketTimeout(int millis) throws PSQLException {
try {
Socket s = pgStream.getSocket();
if (!s.isClosed()) { // Is this check required?
pgStream.getSocket().setSoTimeout(millis);
pgStream.setNetworkTimeout(millis);
}
} catch (SocketException e) {
} catch (IOException e) {
throw new PSQLException(GT.tr("An error occurred while trying to reset the socket timeout."),
PSQLState.CONNECTION_FAILURE, e);
}

0 comments on commit 799e78d

Please sign in to comment.