Skip to content

Commit

Permalink
Allow Connection sources other than Driver in BaseJdbcClient
Browse files Browse the repository at this point in the history
A JDBC connector may want to utilize connection pooling or use
`DataSource` as the source of `Connection`s.  This change removes
`BaseJdbcClient`'s direct dependence on `Driver`, allowing such
connectors to extend `BaseJdbcClient`.
  • Loading branch information
findepi authored and kokosing committed Dec 8, 2017
1 parent 66b0365 commit 4902ba2
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 34 deletions.
Expand Up @@ -35,7 +35,6 @@

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -44,7 +43,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -100,33 +98,21 @@ public class BaseJdbcClient
.build();

protected final String connectorId;
protected final Driver driver;
protected final String connectionUrl;
protected final Properties connectionProperties;
protected final ConnectionFactory connectionFactory;
protected final String identifierQuote;

public BaseJdbcClient(JdbcConnectorId connectorId, BaseJdbcConfig config, String identifierQuote, Driver driver)
public BaseJdbcClient(JdbcConnectorId connectorId, BaseJdbcConfig config, String identifierQuote, ConnectionFactory connectionFactory)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
requireNonNull(config, "config is null"); // currently unused, retained as parameter for future extensions
this.identifierQuote = requireNonNull(identifierQuote, "identifierQuote is null");
this.driver = requireNonNull(driver, "driver is null");

requireNonNull(config, "config is null");
connectionUrl = config.getConnectionUrl();

connectionProperties = new Properties();
if (config.getConnectionUser() != null) {
connectionProperties.setProperty("user", config.getConnectionUser());
}
if (config.getConnectionPassword() != null) {
connectionProperties.setProperty("password", config.getConnectionPassword());
}
this.connectionFactory = requireNonNull(connectionFactory, "connectionFactory is null");
}

@Override
public Set<String> getSchemaNames()
{
try (Connection connection = driver.connect(connectionUrl, connectionProperties);
try (Connection connection = connectionFactory.openConnection();
ResultSet resultSet = connection.getMetaData().getSchemas()) {
ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder();
while (resultSet.next()) {
Expand All @@ -146,7 +132,7 @@ public Set<String> getSchemaNames()
@Override
public List<SchemaTableName> getTableNames(@Nullable String schema)
{
try (Connection connection = driver.connect(connectionUrl, connectionProperties)) {
try (Connection connection = connectionFactory.openConnection()) {
DatabaseMetaData metadata = connection.getMetaData();
if (metadata.storesUpperCaseIdentifiers() && (schema != null)) {
schema = schema.toUpperCase(ENGLISH);
Expand All @@ -168,7 +154,7 @@ public List<SchemaTableName> getTableNames(@Nullable String schema)
@Override
public JdbcTableHandle getTableHandle(SchemaTableName schemaTableName)
{
try (Connection connection = driver.connect(connectionUrl, connectionProperties)) {
try (Connection connection = connectionFactory.openConnection()) {
DatabaseMetaData metadata = connection.getMetaData();
String jdbcSchemaName = schemaTableName.getSchemaName();
String jdbcTableName = schemaTableName.getTableName();
Expand Down Expand Up @@ -203,7 +189,7 @@ public JdbcTableHandle getTableHandle(SchemaTableName schemaTableName)
@Override
public List<JdbcColumnHandle> getColumns(JdbcTableHandle tableHandle)
{
try (Connection connection = driver.connect(connectionUrl, connectionProperties)) {
try (Connection connection = connectionFactory.openConnection()) {
try (ResultSet resultSet = getColumns(tableHandle, connection.getMetaData())) {
List<JdbcColumnHandle> columns = new ArrayList<>();
boolean found = false;
Expand Down Expand Up @@ -250,7 +236,7 @@ public ConnectorSplitSource getSplits(JdbcTableLayoutHandle layoutHandle)
public Connection getConnection(JdbcSplit split)
throws SQLException
{
Connection connection = driver.connect(connectionUrl, connectionProperties);
Connection connection = connectionFactory.openConnection();
try {
connection.setReadOnly(true);
}
Expand Down Expand Up @@ -297,7 +283,7 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorTableMetadata tableMetada
throw new PrestoException(NOT_FOUND, "Schema not found: " + schema);
}

try (Connection connection = driver.connect(connectionUrl, connectionProperties)) {
try (Connection connection = connectionFactory.openConnection()) {
boolean uppercase = connection.getMetaData().storesUpperCaseIdentifiers();
if (uppercase) {
schema = schema.toUpperCase(ENGLISH);
Expand Down Expand Up @@ -392,7 +378,7 @@ public void dropTable(JdbcTableHandle handle)
.append("DROP TABLE ")
.append(quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()));

try (Connection connection = driver.connect(connectionUrl, connectionProperties)) {
try (Connection connection = connectionFactory.openConnection()) {
execute(connection, sql.toString());
}
catch (SQLException e) {
Expand Down Expand Up @@ -426,7 +412,7 @@ public String buildInsertSql(JdbcOutputTableHandle handle)
public Connection getConnection(JdbcOutputTableHandle handle)
throws SQLException
{
return driver.connect(connectionUrl, connectionProperties);
return connectionFactory.openConnection();
}

@Override
Expand Down
@@ -0,0 +1,23 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import java.sql.Connection;
import java.sql.SQLException;

public interface ConnectionFactory
{
Connection openConnection()
throws SQLException;
}
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
import java.util.Properties;

import static java.util.Objects.requireNonNull;

public class DriverConnectionFactory
implements ConnectionFactory
{
private final Driver driver;
private final String connectionUrl;
private final Properties connectionProperties;

public DriverConnectionFactory(Driver driver, BaseJdbcConfig config)
{
this(driver, config.getConnectionUrl(), basicConnectionProperties(config));
}

public static Properties basicConnectionProperties(BaseJdbcConfig config)
{
Properties connectionProperties = new Properties();
if (config.getConnectionUser() != null) {
connectionProperties.setProperty("user", config.getConnectionUser());
}
if (config.getConnectionPassword() != null) {
connectionProperties.setProperty("password", config.getConnectionPassword());
}
return connectionProperties;
}

public DriverConnectionFactory(Driver driver, String connectionUrl, Properties connectionProperties)
{
this.driver = requireNonNull(driver, "driver is null");
this.connectionUrl = requireNonNull(connectionUrl, "connectionUrl is null");
this.connectionProperties = new Properties();
this.connectionProperties.putAll(requireNonNull(connectionProperties, "basicConnectionProperties is null"));
}

@Override
public Connection openConnection()
throws SQLException
{
return driver.connect(connectionUrl, connectionProperties);
}
}
Expand Up @@ -24,6 +24,7 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand All @@ -43,9 +44,9 @@ public TestingDatabase()
String connectionUrl = "jdbc:h2:mem:test" + System.nanoTime();
jdbcClient = new BaseJdbcClient(
new JdbcConnectorId(CONNECTOR_ID),
new BaseJdbcConfig().setConnectionUrl(connectionUrl),
new BaseJdbcConfig(),
"\"",
new Driver());
new DriverConnectionFactory(new Driver(), connectionUrl, new Properties()));

connection = DriverManager.getConnection(connectionUrl);
connection.createStatement().execute("CREATE SCHEMA example");
Expand Down
Expand Up @@ -36,7 +36,7 @@ public void configure(Binder binder)
@Provides
public JdbcClient provideJdbcClient(JdbcConnectorId id, BaseJdbcConfig config)
{
return new BaseJdbcClient(id, config, "\"", new Driver());
return new BaseJdbcClient(id, config, "\"", new DriverConnectionFactory(new Driver(), config));
}

public static Map<String, String> createProperties()
Expand Down
Expand Up @@ -15,6 +15,8 @@

import com.facebook.presto.plugin.jdbc.BaseJdbcClient;
import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
import com.facebook.presto.plugin.jdbc.ConnectionFactory;
import com.facebook.presto.plugin.jdbc.DriverConnectionFactory;
import com.facebook.presto.plugin.jdbc.JdbcConnectorId;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.type.Type;
Expand All @@ -31,8 +33,10 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Set;

import static com.facebook.presto.plugin.jdbc.DriverConnectionFactory.basicConnectionProperties;
import static java.util.Locale.ENGLISH;

public class MySqlClient
Expand All @@ -42,7 +46,13 @@ public class MySqlClient
public MySqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config, MySqlConfig mySqlConfig)
throws SQLException
{
super(connectorId, config, "`", new Driver());
super(connectorId, config, "`", connectionFactory(config, mySqlConfig));
}

private static ConnectionFactory connectionFactory(BaseJdbcConfig config, MySqlConfig mySqlConfig)
throws SQLException
{
Properties connectionProperties = basicConnectionProperties(config);
connectionProperties.setProperty("nullCatalogMeansCurrent", "false");
connectionProperties.setProperty("useUnicode", "true");
connectionProperties.setProperty("characterEncoding", "utf8");
Expand All @@ -54,13 +64,15 @@ public MySqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config, MySqlConf
if (mySqlConfig.getConnectionTimeout() != null) {
connectionProperties.setProperty("connectTimeout", String.valueOf(mySqlConfig.getConnectionTimeout().toMillis()));
}

return new DriverConnectionFactory(new Driver(), config.getConnectionUrl(), connectionProperties);
}

@Override
public Set<String> getSchemaNames()
{
// for MySQL, we need to list catalogs instead of schemas
try (Connection connection = driver.connect(connectionUrl, connectionProperties);
try (Connection connection = connectionFactory.openConnection();
ResultSet resultSet = connection.getMetaData().getCatalogs()) {
ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder();
while (resultSet.next()) {
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.plugin.jdbc.BaseJdbcClient;
import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
import com.facebook.presto.plugin.jdbc.DriverConnectionFactory;
import com.facebook.presto.plugin.jdbc.JdbcConnectorId;
import com.facebook.presto.plugin.jdbc.JdbcOutputTableHandle;
import org.postgresql.Driver;
Expand All @@ -34,7 +35,7 @@ public class PostgreSqlClient
public PostgreSqlClient(JdbcConnectorId connectorId, BaseJdbcConfig config)
throws SQLException
{
super(connectorId, config, "\"", new Driver());
super(connectorId, config, "\"", new DriverConnectionFactory(new Driver(), config));
}

@Override
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.plugin.jdbc.BaseJdbcClient;
import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
import com.facebook.presto.plugin.jdbc.DriverConnectionFactory;
import com.facebook.presto.plugin.jdbc.JdbcConnectorId;
import com.facebook.presto.plugin.jdbc.JdbcOutputTableHandle;
import com.facebook.presto.spi.PrestoException;
Expand All @@ -35,7 +36,7 @@ public class RedshiftClient
public RedshiftClient(JdbcConnectorId connectorId, BaseJdbcConfig config)
throws SQLException
{
super(connectorId, config, "\"", new Driver());
super(connectorId, config, "\"", new DriverConnectionFactory(new Driver(), config));
}

@Override
Expand Down
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.plugin.jdbc.BaseJdbcClient;
import com.facebook.presto.plugin.jdbc.BaseJdbcConfig;
import com.facebook.presto.plugin.jdbc.DriverConnectionFactory;
import com.facebook.presto.plugin.jdbc.JdbcConnectorId;
import com.facebook.presto.plugin.jdbc.JdbcOutputTableHandle;
import com.facebook.presto.spi.PrestoException;
Expand All @@ -34,7 +35,7 @@ public class SqlServerClient
public SqlServerClient(JdbcConnectorId connectorId, BaseJdbcConfig config)
throws SQLException
{
super(connectorId, config, "\"", new SQLServerDriver());
super(connectorId, config, "\"", new DriverConnectionFactory(new SQLServerDriver(), config));
}

@Override
Expand Down

0 comments on commit 4902ba2

Please sign in to comment.