Skip to content

Commit

Permalink
Support executeUpdate in JDBC driver
Browse files Browse the repository at this point in the history
This returns the number of affected rows from executeUpdate
and implements all variants of execute and executeUpdate.
  • Loading branch information
Stanislaw Ogorkis authored and electrum committed Dec 7, 2015
1 parent f400a0a commit da9f6bd
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 33 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -153,6 +153,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-blackhole</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-base-jdbc</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions presto-docs/src/main/sphinx/release/release-0.129.rst
Expand Up @@ -9,6 +9,13 @@ The scheduler can now be configured to take network topology into account when
scheduling splits. This is set using the ``node-scheduler.network-topology``
config. See :doc:`/admin/tuning` for more information.

General Changes
---------------

* The JDBC driver now properly supports non-query statements.
The ``Statement`` interface supports all variants of the ``execute`` methods.
It also supports the ``getUpdateCount`` and ``getLargeUpdateCount`` methods.

Hive Changes
------------

Expand Down
6 changes: 6 additions & 0 deletions presto-jdbc/pom.xml
Expand Up @@ -121,6 +121,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-blackhole</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Expand Up @@ -1795,7 +1795,7 @@ protected Iterable<List<Object>> computeNext()
}
}

private static SQLException resultsException(QueryResults results)
static SQLException resultsException(QueryResults results)
{
QueryError error = requireNonNull(results.getError());
String message = format("Query failed (#%s): %s", results.getId(), error.getMessage());
Expand Down
135 changes: 107 additions & 28 deletions presto-jdbc/src/main/java/com/facebook/presto/jdbc/PrestoStatement.java
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.jdbc;

import com.facebook.presto.client.StatementClient;
import com.google.common.primitives.Ints;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -23,8 +24,10 @@
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.jdbc.PrestoResultSet.resultsException;
import static java.util.Objects.requireNonNull;

public class PrestoStatement
Expand All @@ -37,6 +40,7 @@ public class PrestoStatement
private final AtomicBoolean closeOnCompletion = new AtomicBoolean();
private final AtomicReference<PrestoConnection> connection;
private final AtomicReference<ResultSet> currentResult = new AtomicReference<>();
private final AtomicLong currentUpdateCount = new AtomicLong(-1);

PrestoStatement(PrestoConnection connection)
{
Expand All @@ -47,23 +51,10 @@ public class PrestoStatement
public ResultSet executeQuery(String sql)
throws SQLException
{
try {
StatementClient client = connection().startQuery(sql);
ResultSet result = new PrestoResultSet(client);
checkSetOrResetSession(client);
currentResult.set(result);
return result;
}
catch (RuntimeException e) {
throw new SQLException("Error executing query", e);
if (!execute(sql)) {
throw new SQLException("SQL statement is not a query: " + sql);
}
}

@Override
public int executeUpdate(String sql)
throws SQLException
{
throw new NotImplementedException("Statement", "executeUpdate");
return currentResult.get();
}

@Override
Expand Down Expand Up @@ -174,10 +165,55 @@ public void setCursorName(String name)
public boolean execute(String sql)
throws SQLException
{
clearCurrentResults();
checkOpen();
// Only support returning a single result set
currentResult.set(executeQuery(sql));
return true;

StatementClient client = null;
ResultSet resultSet = null;
try {
client = connection().startQuery(sql);
if (client.isFailed()) {
throw resultsException(client.finalResults());
}

resultSet = new PrestoResultSet(client);
checkSetOrResetSession(client);

// check if this is a query
if (client.current().getUpdateType() == null) {
currentResult.set(resultSet);
return true;
}

// this is an update, not a query
while (resultSet.next()) {
// ignore rows
}

Long updateCount = client.finalResults().getUpdateCount();
currentUpdateCount.set((updateCount != null) ? updateCount : 0);

return false;
}
catch (RuntimeException e) {
throw new SQLException("Error executing query", e);
}
finally {
if (currentResult.get() == null) {
if (resultSet != null) {
resultSet.close();
}
if (client != null) {
client.close();
}
}
}
}

private void clearCurrentResults()
{
currentResult.set(null);
currentUpdateCount.set(-1);
}

@Override
Expand All @@ -191,10 +227,16 @@ public ResultSet getResultSet()
@Override
public int getUpdateCount()
throws SQLException
{
return Ints.saturatedCast(getLargeUpdateCount());
}

@Override
public long getLargeUpdateCount()
throws SQLException
{
checkOpen();
// Updates are not allowed yet so return -1
return -1;
return currentUpdateCount.get();
}

@Override
Expand Down Expand Up @@ -316,46 +358,84 @@ public ResultSet getGeneratedKeys()
throw new SQLFeatureNotSupportedException("getGeneratedKeys");
}

@Override
public int executeUpdate(String sql)
throws SQLException
{
return Ints.saturatedCast(executeLargeUpdate(sql));
}

@Override
public int executeUpdate(String sql, int autoGeneratedKeys)
throws SQLException
{
throw new SQLFeatureNotSupportedException("executeUpdate");
return executeUpdate(sql);
}

@Override
public int executeUpdate(String sql, int[] columnIndexes)
throws SQLException
{
throw new SQLFeatureNotSupportedException("executeUpdate");
return executeUpdate(sql);
}

@Override
public int executeUpdate(String sql, String[] columnNames)
throws SQLException
{
throw new SQLFeatureNotSupportedException("executeUpdate");
return executeUpdate(sql);
}

@Override
public long executeLargeUpdate(String sql)
throws SQLException
{
if (execute(sql)) {
throw new SQLException("SQL is not an update statement: " + sql);
}
return currentUpdateCount.get();
}

@Override
public long executeLargeUpdate(String sql, int autoGeneratedKeys)
throws SQLException
{
return executeLargeUpdate(sql);
}

@Override
public long executeLargeUpdate(String sql, int[] columnIndexes)
throws SQLException
{
return executeLargeUpdate(sql);
}

@Override
public long executeLargeUpdate(String sql, String[] columnNames)
throws SQLException
{
return executeLargeUpdate(sql);
}

@Override
public boolean execute(String sql, int autoGeneratedKeys)
throws SQLException
{
throw new SQLFeatureNotSupportedException("execute");
return execute(sql);
}

@Override
public boolean execute(String sql, int[] columnIndexes)
throws SQLException
{
throw new SQLFeatureNotSupportedException("execute");
return execute(sql);
}

@Override
public boolean execute(String sql, String[] columnNames)
throws SQLException
{
throw new SQLFeatureNotSupportedException("execute");
return execute(sql);
}

@Override
Expand Down Expand Up @@ -451,7 +531,6 @@ private static boolean validFetchDirection(int direction)
private static void checkSetOrResetSession(StatementClient client) throws SQLException
{
if (!client.getSetSessionProperties().isEmpty() || !client.getResetSessionProperties().isEmpty()) {
client.close();
throw new SQLFeatureNotSupportedException(
"SET/RESET SESSION is not supported via JDBC. " +
"Use the setSessionProperty() method on PrestoConnection.");
Expand Down

0 comments on commit da9f6bd

Please sign in to comment.