Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 3 additions & 80 deletions src/main/java/org/embulk/output/DatabricksOutputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.embulk.output;

import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import org.embulk.config.ConfigDiff;
Expand Down Expand Up @@ -171,85 +169,10 @@ protected void logConnectionProperties(String url, Properties props) {
super.logConnectionProperties(url, maskedProps);
}

// This is almost copy from AbstractJdbcOutputPlugin excepting validation of table exists in
// current schema
public Optional<JdbcSchema> newJdbcSchemaFromTableIfExists(
JdbcOutputConnection connection, TableIdentifier table) throws SQLException {
if (!connection.tableExists(table)) {
// DatabaseMetaData.getPrimaryKeys fails if table does not exist
return Optional.empty();
}

DatabaseMetaData dbm = connection.getMetaData();
String escape = dbm.getSearchStringEscape();

ResultSet rs =
dbm.getPrimaryKeys(table.getDatabase(), table.getSchemaName(), table.getTableName());
final HashSet<String> primaryKeysBuilder = new HashSet<>();
try {
while (rs.next()) {
if (!((DatabricksOutputConnection) connection)
.isAvailableTableMetadataInConnection(rs, table)) {
continue;
}
primaryKeysBuilder.add(rs.getString("COLUMN_NAME"));
}
} finally {
rs.close();
}
final Set<String> primaryKeys = Collections.unmodifiableSet(primaryKeysBuilder);

final ArrayList<JdbcColumn> builder = new ArrayList<>();
// NOTE: Columns of TIMESTAMP_NTZ, INTERVAL are not included in getColumns result.
// This cause runtime sql exception when copy into.
// (probably because of unsupported in databricks jdbc)
// https://docs.databricks.com/en/sql/language-manual/data-types/interval-type.html
// https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html#notes
rs =
dbm.getColumns(
JdbcUtils.escapeSearchString(table.getDatabase(), escape),
JdbcUtils.escapeSearchString(table.getSchemaName(), escape),
JdbcUtils.escapeSearchString(table.getTableName(), escape),
null);
try {
while (rs.next()) {
if (!((DatabricksOutputConnection) connection)
.isAvailableTableMetadataInConnection(rs, table)) {
continue;
}
String columnName = rs.getString("COLUMN_NAME");
String simpleTypeName = rs.getString("TYPE_NAME").toUpperCase(Locale.ENGLISH);
boolean isUniqueKey = primaryKeys.contains(columnName);
int sqlType = rs.getInt("DATA_TYPE");
int colSize = rs.getInt("COLUMN_SIZE");
int decDigit = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
decDigit = -1;
}
int charOctetLength = rs.getInt("CHAR_OCTET_LENGTH");
boolean isNotNull = "NO".equals(rs.getString("IS_NULLABLE"));
// rs.getString("COLUMN_DEF") // or null // TODO
builder.add(
JdbcColumn.newGenericTypeColumn(
columnName,
sqlType,
simpleTypeName,
colSize,
decDigit,
charOctetLength,
isNotNull,
isUniqueKey));
// We can't get declared column name using JDBC API.
// Subclasses need to overwrite it.
}
} finally {
rs.close();
}
final List<JdbcColumn> columns = Collections.unmodifiableList(builder);
if (columns.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new JdbcSchema(columns));
}
return super.newJdbcSchemaFromTableIfExists(
connection,
((DatabricksOutputConnection) connection).currentConnectionTableIdentifier(table));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,54 +42,21 @@ protected void executeUseStatement(String sql) throws SQLException {
// This is almost copy from JdbcOutputConnection excepting validation of table exists in current
// schema
public boolean tableExists(TableIdentifier table) throws SQLException {
try (ResultSet rs =
connection
.getMetaData()
.getTables(table.getDatabase(), table.getSchemaName(), table.getTableName(), null)) {
while (rs.next()) {
if (isAvailableTableMetadataInConnection(rs, table)) {
return true;
}
}
}
return false;
return super.tableExists(currentConnectionTableIdentifier(table));
}

public boolean isAvailableTableMetadataInConnection(ResultSet rs, TableIdentifier tableIdentifier)
throws SQLException {
// If unchecked, tables in other catalogs may appear to exist.
// This is because the base embulk jdbc plugin's tableIdentifier.getDatabase() is often returns
// null
// and one Databricks connection has multiple available catalogs (databases).

if (tableIdentifier.getDatabase() == null) {
logger.trace("tableIdentifier.getDatabase() == null, check by instance variable");
if (!rs.getString("TABLE_CAT").equalsIgnoreCase(catalogName)) {
return false;
}
}
if (tableIdentifier.getSchemaName() == null) {
logger.trace("tableIdentifier.getSchemaName() == null, check by instance variable");
if (!rs.getString("TABLE_SCHEM").equalsIgnoreCase(schemaName)) {
return false;
}
}

if (tableIdentifier.getDatabase() != null
&& !tableIdentifier.getDatabase().equalsIgnoreCase(catalogName)) {
logger.error(
String.format(
"tableIdentifier.getSchemaName() != instance variable. (%s, %s)",
tableIdentifier.getDatabase(), catalogName));
}
if (tableIdentifier.getSchemaName() != null
&& !tableIdentifier.getSchemaName().equalsIgnoreCase(schemaName)) {
logger.error(
String.format(
"tableIdentifier.getSchemaName() != instance variable. (%s, %s)",
tableIdentifier.getSchemaName(), schemaName));
}
return true;
public TableIdentifier currentConnectionTableIdentifier(TableIdentifier tableIdentifier) {
// Caution:
// JdbcOutputPlugin sometimes uses tableIdentifier whose database variable is null,
// which causes unexpected DatabaseMetaData behavior in AbstractJdbcOutputPlugin.
// For example, getTables and getColumns search in all catalogs,
// not just the one specified by the connection's default value,
// and can't search in schemas with multibyte name.
// So, if tableIdentifier database variable is null, it will set the connection's default value.
return new TableIdentifier(
tableIdentifier.getDatabase() != null ? tableIdentifier.getDatabase() : catalogName,
tableIdentifier.getSchemaName() != null ? tableIdentifier.getSchemaName() : schemaName,
tableIdentifier.getTableName());
}

@Override
Expand Down
130 changes: 130 additions & 0 deletions src/test/java/org/embulk/output/databricks/TestDatabaseMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package org.embulk.output.databricks;

import static java.lang.String.format;
import static org.embulk.output.databricks.util.ConnectionUtil.*;
import static org.junit.Assert.assertEquals;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.embulk.output.databricks.util.ConfigUtil;
import org.embulk.output.jdbc.JdbcUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

// The purpose of this class is to understand the behavior of DatabaseMetadata,
// so if this test fails due to a library update, please change the test result.
public class TestDatabaseMetadata {
private DatabaseMetaData dbm;
private Connection conn;

ConfigUtil.TestTask t = ConfigUtil.createTestTask();
String catalog = t.getCatalogName();
String schema = t.getSchemaName();
String table = t.getTablePrefix() + "_test";
String nonAsciiCatalog = t.getNonAsciiCatalogName();
String nonAsciiSchema = t.getNonAsciiSchemaName();
String nonAsciiTable = t.getTablePrefix() + "_テスト";

@Before
public void setup() throws SQLException, ClassNotFoundException {
conn = connectByTestTask();
dbm = conn.getMetaData();
run(conn, "USE CATALOG " + catalog);
run(conn, "USE SCHEMA " + schema);
createTables();
}

@After
public void cleanup() {
try {
conn.close();
} catch (SQLException ignored) {

}
dropAllTemporaryTables();
}

@Test
public void testGetPrimaryKeys() throws SQLException {
assertEquals(1, countPrimaryKeys(catalog, schema, table, "a0"));
assertEquals(1, countPrimaryKeys(null, schema, table, "a0"));
assertEquals(1, countPrimaryKeys(nonAsciiCatalog, nonAsciiSchema, nonAsciiTable, "h0"));
assertEquals(1, countPrimaryKeys(null, nonAsciiSchema, nonAsciiTable, "d0"));
}

@Test
public void testGetTables() throws SQLException {
assertEquals(1, countTablesResult(catalog, schema, table));
assertEquals(2, countTablesResult(null, schema, table));
assertEquals(1, countTablesResult(nonAsciiCatalog, nonAsciiSchema, nonAsciiTable));
assertEquals(0, countTablesResult(null, nonAsciiSchema, nonAsciiTable)); // expected 2
}

@Test
public void testGetColumns() throws SQLException {
assertEquals(2, countColumnsResult(catalog, schema, table));
assertEquals(4, countColumnsResult(null, schema, table));
assertEquals(2, countColumnsResult(nonAsciiCatalog, nonAsciiSchema, nonAsciiTable));
assertEquals(0, countColumnsResult(null, nonAsciiSchema, nonAsciiTable)); // expected 2
}

private void createTables() {
String queryFormat =
"CREATE TABLE IF NOT EXISTS `%s`.`%s`.`%s` (%s String PRIMARY KEY, %s INTEGER)";
run(conn, format(queryFormat, catalog, schema, table, "a0", "a1"));
run(conn, format(queryFormat, catalog, schema, nonAsciiTable, "b0", "b1"));
run(conn, format(queryFormat, catalog, nonAsciiSchema, table, "c0", "c1"));
run(conn, format(queryFormat, catalog, nonAsciiSchema, nonAsciiTable, "d0", "d1"));
run(conn, format(queryFormat, nonAsciiCatalog, schema, table, "e0", "e1"));
run(conn, format(queryFormat, nonAsciiCatalog, schema, nonAsciiTable, "f0", "f1"));
run(conn, format(queryFormat, nonAsciiCatalog, nonAsciiSchema, table, "g0", "g1"));
run(conn, format(queryFormat, nonAsciiCatalog, nonAsciiSchema, nonAsciiTable, "h0", "h1"));
}

private int countPrimaryKeys(
String catalogName, String schemaName, String tableName, String primaryKey)
throws SQLException {
try (ResultSet rs = dbm.getPrimaryKeys(catalogName, schemaName, tableName)) {
int count = 0;
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
assertEquals(primaryKey, columnName);
count += 1;
}
return count;
}
}

private int countTablesResult(String catalogName, String schemaName, String tableName)
throws SQLException {
String e = dbm.getSearchStringEscape();
String c = JdbcUtils.escapeSearchString(catalogName, e);
String s = JdbcUtils.escapeSearchString(schemaName, e);
String t = JdbcUtils.escapeSearchString(tableName, e);
try (ResultSet rs = dbm.getTables(c, s, t, null)) {
return countResultSet(rs);
}
}

private int countColumnsResult(String catalogName, String schemaName, String tableName)
throws SQLException {
String e = dbm.getSearchStringEscape();
String c = JdbcUtils.escapeSearchString(catalogName, e);
String s = JdbcUtils.escapeSearchString(schemaName, e);
String t = JdbcUtils.escapeSearchString(tableName, e);
try (ResultSet rs = dbm.getColumns(c, s, t, null)) {
return countResultSet(rs);
}
}

private int countResultSet(ResultSet rs) throws SQLException {
int count = 0;
while (rs.next()) {
count += 1;
}
return count;
}
}
Loading