Permalink
Browse files

fix: make warnings available as soon as they are received (#857)

Until REL9.4.1210 warnings were available via Statement#getWarnings()
and ResultSet#getWarnings() as soon as they were received from the
server. This commit returns to that behavior.
This is useful for long running queries, where it can be beneficial
to know about a warning before the query completes.

fixes #856
  • Loading branch information...
magJ authored and vlsi committed Oct 20, 2017
1 parent 0d8fde6 commit 83dd5fea94928b349e05c1417e264797052f2bbe
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2017, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.jdbc;

import java.sql.SQLWarning;

/**
* Wrapper class for SQLWarnings that provides an optimisation to add
* new warnings to the tail of the SQLWarning singly linked list, avoiding Θ(n) insertion time
* of calling #setNextWarning on the head. By encapsulating this into a single object it allows
* users(ie PgStatement) to atomically set and clear the warning chain.
*/
class PSQLWarningWrapper {

private final SQLWarning firstWarning;
private SQLWarning lastWarning;

PSQLWarningWrapper(SQLWarning warning) {
firstWarning = warning;
lastWarning = warning;
}

void addWarning(SQLWarning sqlWarning) {
lastWarning.setNextWarning(sqlWarning);
lastWarning = sqlWarning;
}

SQLWarning getFirstWarning() {
return firstWarning;
}

}
@@ -98,11 +98,7 @@
/**
* The warnings chain.
*/
protected SQLWarning warnings = null;
/**
* The last warning of the warning chain.
*/
protected SQLWarning lastWarning = null;
protected volatile PSQLWarningWrapper warnings = null;

/**
* Maximum number of rows to return, 0 = unlimited
@@ -218,13 +214,10 @@ public void handleCommandStatus(String status, int updateCount, long insertOID)
}

@Override
public void handleCompletion() throws SQLException {
SQLWarning warning = getWarning();
if (warning != null) {
PgStatement.this.addWarning(warning);
}
super.handleCompletion();
public void handleWarning(SQLWarning warning) {
PgStatement.this.addWarning(warning);
}

}

public java.sql.ResultSet executeQuery(String p_sql) throws SQLException {
@@ -536,25 +529,29 @@ public void setQueryTimeoutMs(int millis) throws SQLException {
}

/**
* This adds a warning to the warning chain. We track the tail of the warning chain as well to
* avoid O(N) behavior for adding a new warning to an existing chain. Some server functions which
* RAISE NOTICE (or equivalent) produce a ton of warnings.
* Either initializes new warning wrapper, or adds warning onto the chain.
*
* Although warnings are expected to be added sequentially, the warnings chain may be cleared
* concurrently at any time via {@link #clearWarnings()}, therefore it is possible that a warning
* added via this method is placed onto the end of the previous warning chain
*
* @param warn warning to add
*/
public void addWarning(SQLWarning warn) {
if (warnings == null) {
warnings = warn;
lastWarning = warn;
//copy reference to avoid NPE from concurrent modification of this.warnings
final PSQLWarningWrapper warnWrap = this.warnings;
if (warnWrap == null) {
this.warnings = new PSQLWarningWrapper(warn);
} else {
lastWarning.setNextWarning(warn);
lastWarning = warn;
warnWrap.addWarning(warn);
}
}

public SQLWarning getWarnings() throws SQLException {
checkClosed();
return warnings;
//copy reference to avoid NPE from concurrent modification of this.warnings
final PSQLWarningWrapper warnWrap = this.warnings;
return warnWrap != null ? warnWrap.getFirstWarning() : null;
}

public int getMaxFieldSize() throws SQLException {
@@ -571,9 +568,15 @@ public void setMaxFieldSize(int max) throws SQLException {
maxfieldSize = max;
}

/**
* Clears the warning chain.<p>
* Note that while it is safe to clear warnings while the query is executing, warnings that are
* added between calls to {@link #getWarnings()} and #clearWarnings() may be missed.
* Therefore you should hold a reference to the tail of the previous warning chain
* and verify if its {@link SQLWarning#getNextWarning()} value is holds any new value.
*/
public void clearWarnings() throws SQLException {
warnings = null;
lastWarning = null;
}

public java.sql.ResultSet getResultSet() throws SQLException {
@@ -25,8 +25,16 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

@@ -43,6 +51,7 @@ public void setUp() throws Exception {
TestUtil.createTempTable(con, "escapetest",
"ts timestamp, d date, t time, \")\" varchar(5), \"\"\"){a}'\" text ");
TestUtil.createTempTable(con, "comparisontest", "str1 varchar(5), str2 varchar(15)");
TestUtil.createTable(con, "test_lock", "name text");
Statement stmt = con.createStatement();
stmt.executeUpdate(TestUtil.insertSQL("comparisontest", "str1,str2", "'_abcd','_found'"));
stmt.executeUpdate(TestUtil.insertSQL("comparisontest", "str1,str2", "'%abcd','%found'"));
@@ -54,6 +63,9 @@ public void tearDown() throws Exception {
TestUtil.dropTable(con, "test_statement");
TestUtil.dropTable(con, "escapetest");
TestUtil.dropTable(con, "comparisontest");
TestUtil.dropTable(con, "test_lock");
con.createStatement().execute("DROP FUNCTION IF EXISTS notify_loop()");
con.createStatement().execute("DROP FUNCTION IF EXISTS notify_then_sleep()");
con.close();
}

@@ -417,6 +429,136 @@ public void testWarningsAreCleared() throws SQLException {
stmt.close();
}

@Test
public void testWarningsAreAvailableAsap()
throws Exception {
final Connection outerLockCon = TestUtil.openDB();
outerLockCon.setAutoCommit(false);
//Acquire an exclusive lock so we can block the notice generating statement
outerLockCon.createStatement().execute("LOCK TABLE test_lock IN ACCESS EXCLUSIVE MODE;");
con.createStatement()
.execute("CREATE OR REPLACE FUNCTION notify_then_sleep() RETURNS VOID AS "
+ "$BODY$ "
+ "BEGIN "
+ "RAISE NOTICE 'Test 1'; "
+ "RAISE NOTICE 'Test 2'; "
+ "LOCK TABLE test_lock IN ACCESS EXCLUSIVE MODE; "
+ "END "
+ "$BODY$ "
+ "LANGUAGE plpgsql;");
con.createStatement().execute("SET SESSION client_min_messages = 'NOTICE'");
//If we never receive the two warnings the statement will just hang, so set a low timeout
con.createStatement().execute("SET SESSION statement_timeout = 1000");
final PreparedStatement preparedStatement = con.prepareStatement("SELECT notify_then_sleep()");
final Callable<Void> warningReader = new Callable<Void>() {
@Override
public Void call() throws SQLException, InterruptedException {
while (true) {
SQLWarning warning = preparedStatement.getWarnings();
if (warning != null) {
assertEquals("First warning received not first notice raised",
"Test 1", warning.getMessage());
SQLWarning next = warning.getNextWarning();
if (next != null) {
assertEquals("Second warning received not second notice raised",
"Test 2", next.getMessage());
//Release the lock so that the notice generating statement can end.
outerLockCon.commit();
return null;
}
}
//Break the loop on InterruptedException
Thread.sleep(0);
}
}
};
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
Future<Void> future = executorService.submit(warningReader);
//Statement should only finish executing once we have
//received the two notices and released the outer lock.
preparedStatement.execute();

//If test takes longer than 2 seconds its a failure.
future.get(2, TimeUnit.SECONDS);
} finally {
executorService.shutdownNow();
}
}


/**
* Demonstrates a safe approach to concurrently reading the latest
* warnings while periodically clearing them.
*
* One drawback of this approach is that it requires the reader to make it to the end of the
* warning chain before clearing it, so long as your warning processing step is not very slow,
* this should happen more or less instantaneously even if you receive a lot of warnings.
*/
@Test
public void testConcurrentWarningReadAndClear()
throws SQLException, InterruptedException, ExecutionException, TimeoutException {
final int iterations = 1000;
con.createStatement()
.execute("CREATE OR REPLACE FUNCTION notify_loop() RETURNS VOID AS "
+ "$BODY$ "
+ "BEGIN "
+ "FOR i IN 1.. " + iterations + " LOOP "
+ " RAISE NOTICE 'Warning %', i; "
+ "END LOOP; "
+ "END "
+ "$BODY$ "
+ "LANGUAGE plpgsql;");
con.createStatement().execute("SET SESSION client_min_messages = 'NOTICE'");
final PreparedStatement statement = con.prepareStatement("SELECT notify_loop()");
final Callable<Void> warningReader = new Callable<Void>() {
@Override
public Void call() throws SQLException, InterruptedException {
SQLWarning lastProcessed = null;
int warnings = 0;
//For production code replace this with some condition that
//ends after the statement finishes execution
while (warnings < iterations) {
SQLWarning warn = statement.getWarnings();
//if next linked warning has value use that, otherwise keep using latest head
if (lastProcessed != null && lastProcessed.getNextWarning() != null) {
warn = lastProcessed.getNextWarning();
}
if (warn != null) {
warnings++;
//System.out.println("Processing " + warn.getMessage());
assertEquals("Received warning out of expected order",
"Warning " + warnings, warn.getMessage());
lastProcessed = warn;
//If the processed warning was the head of the chain clear
if (warn == statement.getWarnings()) {
//System.out.println("Clearing warnings");
statement.clearWarnings();
}
} else {
//Not required for this test, but a good idea adding some delay for production code
//to avoid high cpu usage while the query is running and no warnings are coming in.
//Alternatively use JDK9's Thread.onSpinWait()
Thread.sleep(10);
}
}
assertEquals("Didn't receive expected last warning",
"Warning " + iterations, lastProcessed.getMessage());
return null;
}
};

final ExecutorService executor = Executors.newSingleThreadExecutor();
try {
final Future warningReaderThread = executor.submit(warningReader);
statement.execute();
//If the reader doesn't return after 2 seconds, it failed.
warningReaderThread.get(2, TimeUnit.SECONDS);
} finally {
executor.shutdownNow();
}
}

/**
* The parser tries to break multiple statements into individual queries as required by the V3
* extended query protocol. It can be a little overzealous sometimes and this test ensures we keep

0 comments on commit 83dd5fe

Please sign in to comment.