Skip to content

Commit

Permalink
fix: try to read any notifies or errors that come in asynchronously (#…
Browse files Browse the repository at this point in the history
…2143)

* try to read any notifies or errors that come in asynchronously

* Added new PSQLState to handle terminated backends and added test to test for fatal termination
  • Loading branch information
davecramer committed May 20, 2021
1 parent 02cc5ba commit b3d5fba
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 3 deletions.
26 changes: 24 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -305,7 +305,19 @@ public synchronized void execute(Query query, @Nullable ParameterList parameters
LOGGER.log(Level.FINEST, " simple execute, handler={0}, maxRows={1}, fetchSize={2}, flags={3}",
new Object[]{handler, maxRows, fetchSize, flags});
}

try {
if (pgStream.hasMessagePending()) {
if (pgStream.peekChar() == 'N') {
pgStream.receiveChar();
handler.handleWarning(receiveNoticeResponse());
} else if (pgStream.peekChar() == 'E') {
pgStream.receiveChar();
throw receiveErrorResponse();
}
}
} catch ( IOException ex ) {
throw new SQLException(ex);
}
if (parameters == null) {
parameters = SimpleQuery.NO_PARAMETERS;
}
Expand Down Expand Up @@ -2361,6 +2373,7 @@ protected void processResults(ResultHandler handler, int flags, boolean adaptive
}

case 'N': // Notice Response
LOGGER.log(Level.FINEST, " <=BE Notice");
SQLWarning warning = receiveNoticeResponse();
handler.handleWarning(warning);
break;
Expand Down Expand Up @@ -2418,7 +2431,16 @@ && castNonNull(pendingExecuteQueue.peekFirst()).asSimple) {
}
}
endQuery = true;

if (pgStream.hasMessagePending()) {
if (pgStream.peekChar() == 'N') {
pgStream.receiveChar();
handler.handleWarning(receiveNoticeResponse());
}
if (pgStream.peekChar() == 'E') {
pgStream.receiveChar();
handler.handleError(receiveErrorResponse());
}
}
// Reset the statement name of Parses that failed.
while (!pendingParseQueue.isEmpty()) {
SimpleQuery failedQuery = pendingParseQueue.removeFirst();
Expand Down
2 changes: 2 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/util/PSQLState.java
Expand Up @@ -106,6 +106,8 @@ public enum PSQLState {
SYSTEM_ERROR("60000"),
IO_ERROR("58030"),

ERROR_CODE_CRASH_SHUTDOWN("57P02"),

UNEXPECTED_ERROR("99999");

private final String state;
Expand Down
3 changes: 2 additions & 1 deletion pgjdbc/src/test/java/org/postgresql/test/jdbc2/CopyTest.java
Expand Up @@ -458,7 +458,8 @@ public void testLockReleaseOnCancelFailure() throws SQLException, InterruptedExc
if (rollbackException == null) {
fail("rollback should have thrown an exception");
}
acceptIOCause(rollbackException);

assertTrue( rollbackException instanceof SQLException);
}

private static class Rollback extends Thread {
Expand Down
68 changes: 68 additions & 0 deletions pgjdbc/src/test/java/org/postgresql/test/jdbc4/TestTerminate.java
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2021, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.test.jdbc4;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.postgresql.util.PSQLState.ERROR_CODE_CRASH_SHUTDOWN;

import org.postgresql.test.TestUtil;
import org.postgresql.test.jdbc2.BaseTest4;

import org.junit.After;
import org.junit.Before;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;

public class TestTerminate extends BaseTest4 {
Connection con2;

@Before
public void setup() throws Exception {
super.setUp();
TestUtil.createTempTable(con, "bigselect", "i int, t text");
TestUtil.execute( "insert into bigselect (i, t) values (generate_series(1,10), 'a;kjdf;ajd;akj;kj;kj;j')", con);
con2 = TestUtil.openPrivilegedDB();
}

@After
public void tearDown() throws SQLException {
super.tearDown();
}

/*
removing this test as it doesn't pass on CI.
You can run it locally to confirm that it works by
adding
@Test and run ./gradlew test --tests TestTerminate
*/
public void testTerminate() throws Exception {
try (Statement statement = con.createStatement()) {
try ( ResultSet rs = statement.executeQuery( "select * from bigselect")) {
try {
con2.createStatement().execute("COPY (SELECT pg_backend_pid()) TO PROGRAM 'xargs kill -SIGSEGV';");
} catch (Exception ex ) {
// ignore this
}
while (rs.next()) {
rs.getString(2);
}
}
try (ResultSet rs = statement.executeQuery("select 1")) {
fail("should not get here, above we have caused a fatal termination. This will close this connection");
} catch (SQLException ex ) {
SQLWarning sqlWarning = statement.getWarnings();
assertNotNull("No warnings were found, there should be a warning", sqlWarning);
assertEquals(sqlWarning.getSQLState(),ERROR_CODE_CRASH_SHUTDOWN.getState());
}
}
}
}

0 comments on commit b3d5fba

Please sign in to comment.