Skip to content

Commit

Permalink
feat: improve waiting for notifications by providing a timeout option (
Browse files Browse the repository at this point in the history
…#778)

* Improves waiting for notifications by adding a timeout to the PGConnection.getNotifications() method. The change does not have any effects for users that don't use this functionality and the API remain fully backwards compatible.
  • Loading branch information
dmigowski authored and davecramer committed Mar 23, 2017
1 parent ef8c6f9 commit a7e0c83
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 35 deletions.
34 changes: 16 additions & 18 deletions docs/documentation/head/listennotify.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ any outstanding notifications.

> A key limitation of the JDBC driver is that it cannot receive asynchronous
notifications and must poll the backend to check if any notifications were issued.
I timeout can be given to the poll function, but then the execution of statements
from other threads will block.

<a name="listen-notify-example"></a>
**Example 9.2. Receiving Notifications**
Expand Down Expand Up @@ -72,20 +74,16 @@ class Listener extends Thread

public void run()
{
while (true)
try
{
try
while (true)
{
// issue a dummy query to contact the backend
// and receive any pending notifications.

Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();

org.postgresql.PGNotification notifications[] = pgconn.getNotifications();

// If this thread is the only one that uses the connection, a timeout can be used to
// receive notifications immediately:
// org.postgresql.PGNotification notifications[] = pgconn.getNotifications(10000);

if (notifications != null)
{
for (int i=0; i&lt;notifications.length; i++)
Expand All @@ -97,14 +95,14 @@ class Listener extends Thread

Thread.sleep(500);
}
catch (SQLException sqle)
{
sqle.printStackTrace();
}
catch (InterruptedException ie)
{
ie.printStackTrace();
}
}
catch (SQLException sqle)
{
sqle.printStackTrace();
}
catch (InterruptedException ie)
{
ie.printStackTrace();
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/PGConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ public interface PGConnection {
*/
PGNotification[] getNotifications() throws SQLException;

/**
* This method returns any notifications that have been received since the last call to this
* method. Returns null if there have been no notifications. A timeout can be specified so the
* driver waits for notifications.
*
* @param timeoutMillis when 0, blocks forever. when &gt; 0, blocks up to the specified number of millies
* or until at least one notification has been received. If more than one notification is
* about to be received, these will be returned in one batch.
* @return notifications that have been received
* @throws SQLException if something wrong happens
* @since 43
*/
PGNotification[] getNotifications(int timeoutMillis) throws SQLException;

/**
* This returns the COPY API for the current connection.
*
Expand Down
10 changes: 10 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/QueryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ Object createQueryKey(String sql, boolean escapeProcessing, boolean isParameteri
*/
void processNotifies() throws SQLException;

/**
* Prior to attempting to retrieve notifications, we need to pull any recently received
* notifications off of the network buffers. The notification retrieval in ProtocolConnection
* cannot do this as it is prone to deadlock, so the higher level caller must be responsible which
* requires exposing this method. This variant supports blocking for the given time in millis.
*
* @throws SQLException if and error occurs while fetching notifications
*/
void processNotifies(int timeoutMillis) throws SQLException;

//
// Fastpath interface.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,8 @@ public boolean isFlushCacheOnDeallocate() {
public void setFlushCacheOnDeallocate(boolean flushCacheOnDeallocate) {
this.flushCacheOnDeallocate = flushCacheOnDeallocate;
}

protected boolean hasNotifications() {
return notifications.size() > 0;
}
}
68 changes: 66 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
Expand Down Expand Up @@ -644,35 +647,96 @@ private void sendFastpathCall(int fnid, SimpleParameterList params)
pgStream.flush();
}

// Just for API compatibility with previous versions.
public synchronized void processNotifies() throws SQLException {
processNotifies(-1);
}

/**
* @param timeoutMillis when &gt; 0, block for this time
* when =0, block forever
* when &lt; 0, don't block
*/
public synchronized void processNotifies(int timeoutMillis) throws SQLException {
waitOnLock();
// Asynchronous notifies only arrive when we are not in a transaction
if (getTransactionState() != TransactionState.IDLE) {
return;
}

if (hasNotifications()) {
// No need to timeout when there are already notifications. We just check for more in this case.
timeoutMillis = -1;
}

boolean useTimeout = timeoutMillis > 0;
long startTime = 0;
int oldTimeout = 0;
if (useTimeout) {
startTime = System.currentTimeMillis();
try {
oldTimeout = pgStream.getSocket().getSoTimeout();
} catch (SocketException e) {
throw new PSQLException(GT.tr("An error occurred while trying to get the socket "
+ "timeout."), PSQLState.CONNECTION_FAILURE, e);
}
}

try {
while (pgStream.hasMessagePending()) {
while (pgStream.hasMessagePending() || timeoutMillis >= 0 ) {
if (useTimeout && timeoutMillis >= 0) {
setSocketTimeout(timeoutMillis);
}
int c = pgStream.receiveChar();
if (useTimeout && timeoutMillis >= 0) {
setSocketTimeout(0); // Don't timeout after first char
}
switch (c) {
case 'A': // Asynchronous Notify
receiveAsyncNotify();
break;
timeoutMillis = -1;
continue;
case 'E':
// Error Response (response to pretty much everything; backend then skips until Sync)
throw receiveErrorResponse();
case 'N': // Notice Response (warnings / info)
SQLWarning warning = receiveNoticeResponse();
addWarning(warning);
if (useTimeout) {
long newTimeMillis = System.currentTimeMillis();
timeoutMillis += startTime - newTimeMillis; // Overflows after 49 days, ignore that
startTime = newTimeMillis;
if (timeoutMillis == 0) {
timeoutMillis = -1; // Don't accidentially wait forever
}
}
break;
default:
throw new PSQLException(GT.tr("Unknown Response Type {0}.", (char) c),
PSQLState.CONNECTION_FAILURE);
}
}
} catch (SocketTimeoutException ioe) {
// No notifications this time...
} catch (IOException ioe) {
throw new PSQLException(GT.tr("An I/O error occurred while sending to the backend."),
PSQLState.CONNECTION_FAILURE, ioe);
} finally {
if (useTimeout) {
setSocketTimeout(oldTimeout);
}
}
}

private void setSocketTimeout(int millis) throws PSQLException {
try {
Socket s = pgStream.getSocket();
if (!s.isClosed()) { // Is this check required?
pgStream.getSocket().setSoTimeout(millis);
}
} catch (SocketException e) {
throw new PSQLException(GT.tr("An error occurred while trying to reset the socket timeout."),
PSQLState.CONNECTION_FAILURE, e);
}
}

Expand Down
7 changes: 6 additions & 1 deletion pgjdbc/src/main/java/org/postgresql/jdbc/PgConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -973,8 +973,13 @@ public void cancelQuery() throws SQLException {

@Override
public PGNotification[] getNotifications() throws SQLException {
return getNotifications(-1);
}

@Override
public PGNotification[] getNotifications(int timeoutMillis) throws SQLException {
checkClosed();
getQueryExecutor().processNotifies();
getQueryExecutor().processNotifies(timeoutMillis);
// Backwards-compatibility hand-holding.
PGNotification[] notifications = queryExecutor.getNotifications();
return (notifications.length == 0 ? null : notifications);
Expand Down
Loading

0 comments on commit a7e0c83

Please sign in to comment.