Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ $ ./gradlew gem # -t to watch change of files and rebuild continuously

## TEST

```
$ EMBULK_OUTPUT_DATABRICKS_TEST_CONFIG="example/test.yml" ./gradlew test # Create example/test.yml based on example/test.yml.example
```
84 changes: 3 additions & 81 deletions src/main/java/org/embulk/output/DatabricksOutputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.embulk.output;

import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import org.embulk.config.ConfigDiff;
Expand Down Expand Up @@ -171,86 +169,10 @@ protected void logConnectionProperties(String url, Properties props) {
super.logConnectionProperties(url, maskedProps);
}

// This is almost copy from AbstractJdbcOutputPlugin excepting validation of table exists in
// current schema
public Optional<JdbcSchema> newJdbcSchemaFromTableIfExists(
JdbcOutputConnection connection, TableIdentifier table) throws SQLException {
if (!connection.tableExists(table)) {
// DatabaseMetaData.getPrimaryKeys fails if table does not exist
return Optional.empty();
}

DatabricksOutputConnection conn = (DatabricksOutputConnection) connection;
DatabaseMetaData dbm = connection.getMetaData();
String escape = dbm.getSearchStringEscape();

ResultSet rs =
dbm.getPrimaryKeys(conn.getCatalogName(), table.getSchemaName(), table.getTableName());
final HashSet<String> primaryKeysBuilder = new HashSet<>();
try {
while (rs.next()) {
if (!((DatabricksOutputConnection) connection)
.isAvailableTableMetadataInConnection(rs, table)) {
continue;
}
primaryKeysBuilder.add(rs.getString("COLUMN_NAME"));
}
} finally {
rs.close();
}
final Set<String> primaryKeys = Collections.unmodifiableSet(primaryKeysBuilder);

final ArrayList<JdbcColumn> builder = new ArrayList<>();
// NOTE: Columns of TIMESTAMP_NTZ, INTERVAL are not included in getColumns result.
// This cause runtime sql exception when copy into.
// (probably because of unsupported in databricks jdbc)
// https://docs.databricks.com/en/sql/language-manual/data-types/interval-type.html
// https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html#notes
rs =
dbm.getColumns(
JdbcUtils.escapeSearchString(conn.getCatalogName(), escape),
JdbcUtils.escapeSearchString(table.getSchemaName(), escape),
JdbcUtils.escapeSearchString(table.getTableName(), escape),
null);
try {
while (rs.next()) {
if (!((DatabricksOutputConnection) connection)
.isAvailableTableMetadataInConnection(rs, table)) {
continue;
}
String columnName = rs.getString("COLUMN_NAME");
String simpleTypeName = rs.getString("TYPE_NAME").toUpperCase(Locale.ENGLISH);
boolean isUniqueKey = primaryKeys.contains(columnName);
int sqlType = rs.getInt("DATA_TYPE");
int colSize = rs.getInt("COLUMN_SIZE");
int decDigit = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
decDigit = -1;
}
int charOctetLength = rs.getInt("CHAR_OCTET_LENGTH");
boolean isNotNull = "NO".equals(rs.getString("IS_NULLABLE"));
// rs.getString("COLUMN_DEF") // or null // TODO
builder.add(
JdbcColumn.newGenericTypeColumn(
columnName,
sqlType,
simpleTypeName,
colSize,
decDigit,
charOctetLength,
isNotNull,
isUniqueKey));
// We can't get declared column name using JDBC API.
// Subclasses need to overwrite it.
}
} finally {
rs.close();
}
final List<JdbcColumn> columns = Collections.unmodifiableList(builder);
if (columns.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new JdbcSchema(columns));
}
return super.newJdbcSchemaFromTableIfExists(
connection,
((DatabricksOutputConnection) connection).currentConnectionTableIdentifier(table));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,58 +42,21 @@ protected void executeUseStatement(String sql) throws SQLException {
// This is almost copy from JdbcOutputConnection excepting validation of table exists in current
// schema
public boolean tableExists(TableIdentifier table) throws SQLException {
try (ResultSet rs =
connection
.getMetaData()
.getTables(catalogName, table.getSchemaName(), table.getTableName(), null)) {
while (rs.next()) {
if (isAvailableTableMetadataInConnection(rs, table)) {
return true;
}
}
}
return false;
return super.tableExists(currentConnectionTableIdentifier(table));
}

public boolean isAvailableTableMetadataInConnection(ResultSet rs, TableIdentifier tableIdentifier)
throws SQLException {
// If unchecked, tables in other catalogs may appear to exist.
// This is because the base embulk jdbc plugin's tableIdentifier.getDatabase() is often returns
// null
// and one Databricks connection has multiple available catalogs (databases).

// NOTE: maybe this logic is not necessary anymore after this PR:
// https://github.com/trocco-io/embulk-output-databricks/pull/11
// But I'm not sure, so I'll keep it for now.

if (tableIdentifier.getDatabase() == null) {
logger.trace("tableIdentifier.getDatabase() == null, check by instance variable");
if (!rs.getString("TABLE_CAT").equalsIgnoreCase(catalogName)) {
return false;
}
}
if (tableIdentifier.getSchemaName() == null) {
logger.trace("tableIdentifier.getSchemaName() == null, check by instance variable");
if (!rs.getString("TABLE_SCHEM").equalsIgnoreCase(schemaName)) {
return false;
}
}

if (tableIdentifier.getDatabase() != null
&& !tableIdentifier.getDatabase().equalsIgnoreCase(catalogName)) {
logger.error(
String.format(
"tableIdentifier.getSchemaName() != instance variable. (%s, %s)",
tableIdentifier.getDatabase(), catalogName));
}
if (tableIdentifier.getSchemaName() != null
&& !tableIdentifier.getSchemaName().equalsIgnoreCase(schemaName)) {
logger.error(
String.format(
"tableIdentifier.getSchemaName() != instance variable. (%s, %s)",
tableIdentifier.getSchemaName(), schemaName));
}
return true;
public TableIdentifier currentConnectionTableIdentifier(TableIdentifier tableIdentifier) {
// Caution:
// JdbcOutputPlugin sometimes uses tableIdentifier whose database variable is null,
// which causes unexpected DatabaseMetaData behavior in AbstractJdbcOutputPlugin.
// For example, getTables and getColumns search in all catalogs,
// not just the one specified by the connection's default value,
// and can't search in schemas with multibyte name.
// So, if tableIdentifier database variable is null, it will set the connection's default value.
return new TableIdentifier(
tableIdentifier.getDatabase() != null ? tableIdentifier.getDatabase() : catalogName,
tableIdentifier.getSchemaName() != null ? tableIdentifier.getSchemaName() : schemaName,
tableIdentifier.getTableName());
}

@Override
Expand Down Expand Up @@ -299,8 +262,4 @@ private String buildColumns(JdbcSchema schema, String prefix) {
}
return sb.toString();
}

public String getCatalogName() {
return catalogName;
}
}
Loading