Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always use an auto-generated doc values as a back-up for Avro doc-related metadata retrieval. #377

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,23 @@ com.spotify.dbeam.options.JdbcExportPipelineOptions:
Controls whether generated Avro schema will contain logicalTypes or not.
```

#### Input Avro schema file
#### Input (provided) Avro schema file

If provided an input Avro schema file, dbeam will read input schema file and use some of the
If there is a provided input Avro schema file (using `--avroSchemaFilePath` parameter), dbeam will read input schema file and use some of the
properties when an output Avro schema is created.

#### Following fields will be propagated from input into output schema:
#### Following fields will be propagated from a provided input schema into the output schema:

* `record.doc`
* `record.name`
* `record.namespace`
* `record.doc`
* `record.field.doc`

#### Precedence rules: Avro record `doc` value
Avro record `doc` value can be set using three means (in order of precedence):
1. From a provided Avro schema `record.doc`
2. From a command line parameter (`--avroDoc`)
3. An automatically generated value

#### DBeam Parallel Mode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.sql.Connection;
import java.time.Duration;
import java.util.Optional;
import org.apache.avro.Schema;

@AutoValue
public abstract class JdbcExportArgs implements Serializable {
Expand All @@ -47,8 +46,6 @@ public abstract class JdbcExportArgs implements Serializable {

public abstract Duration exportTimeout();

public abstract Optional<Schema> inputAvroSchema();

@AutoValue.Builder
abstract static class Builder {

Expand All @@ -66,8 +63,6 @@ abstract static class Builder {

abstract Builder setExportTimeout(Duration exportTimeout);

abstract Builder setInputAvroSchema(Optional<Schema> inputAvroSchema);

abstract JdbcExportArgs build();
}

Expand All @@ -81,8 +76,7 @@ static JdbcExportArgs create(
Optional.empty(),
Optional.empty(),
false,
Duration.ofDays(7),
Optional.empty());
Duration.ofDays(7));
}

public static JdbcExportArgs create(
Expand All @@ -92,8 +86,7 @@ public static JdbcExportArgs create(
final Optional<String> avroSchemaName,
final Optional<String> avroDoc,
final Boolean useAvroLogicalTypes,
final Duration exportTimeout,
final Optional<Schema> inputAvroSchema) {
final Duration exportTimeout) {
return new AutoValue_JdbcExportArgs.Builder()
.setJdbcAvroOptions(jdbcAvroArgs)
.setQueryBuilderArgs(queryBuilderArgs)
Expand All @@ -102,7 +95,6 @@ public static JdbcExportArgs create(
.setAvroDoc(avroDoc)
.setUseAvroLogicalTypes(useAvroLogicalTypes)
.setExportTimeout(exportTimeout)
.setInputAvroSchema(inputAvroSchema)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*-
* -\-\-
* DBeam Core
* --
* Copyright (C) 2016 - 2018 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.dbeam.avro;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroSchemaMetadataProvider {

private static Logger LOGGER = LoggerFactory.getLogger(AvroSchemaMetadataProvider.class);

// provided schema has a priority over single arguments' values.
private final Schema provided;
private final String avroSchemaName;
private final String avroSchemaNamespace;
private final String avroDoc;

public AvroSchemaMetadataProvider(
final String avroSchemaName, final String avroSchemaNamespace, final String avroDoc) {
this(null, avroSchemaName, avroSchemaNamespace, avroDoc);
}

public AvroSchemaMetadataProvider(
final Schema provided,
final String avroSchemaName,
final String avroSchemaNamespace,
final String avroDoc) {
this.provided = provided;
this.avroSchemaName = avroSchemaName;
this.avroSchemaNamespace = avroSchemaNamespace;
this.avroDoc = avroDoc;
}

public String avroDoc(final String defaultVal) {
return (provided != null && provided.getDoc() != null)
? provided.getDoc()
: (avroDoc != null) ? avroDoc : defaultVal;
}

public String avroSchemaName(final String defaultVal) {
return (provided != null && provided.getName() != null)
? provided.getName()
: (avroSchemaName != null) ? avroSchemaName : defaultVal;
}

public String avroSchemaNamespace() {
return (provided != null && provided.getNamespace() != null)
? provided.getNamespace()
: avroSchemaNamespace;
}

public String getFieldDoc(final String fieldName, final String defaultVal) {
if (provided != null) {
final Schema.Field field = provided.getField(fieldName);
if (field != null) {
return field.doc();
} else {
LOGGER.warn(
"Field [{}] not found in the provided schema [{}]", fieldName, provided.getName());
return defaultVal;
}
} else {
return defaultVal;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
Expand All @@ -42,7 +41,7 @@

public class BeamJdbcAvroSchema {

private static Logger LOGGER = LoggerFactory.getLogger(BeamJdbcAvroSchema.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BeamJdbcAvroSchema.class);

/**
* Generate Avro schema by reading one row. Expose Beam metrics via a Beam PTransform.
Expand All @@ -54,10 +53,13 @@ public class BeamJdbcAvroSchema {
* @throws Exception in case of failure to query database
*/
public static Schema createSchema(
final Pipeline pipeline, final JdbcExportArgs args, final Connection connection)
final Pipeline pipeline,
final JdbcExportArgs args,
final Connection connection,
final AvroSchemaMetadataProvider provider)
throws Exception {
final long startTime = System.nanoTime();
final Schema generatedSchema = generateAvroSchema(args, connection);
final Schema generatedSchema = generateAvroSchema(args, connection, provider);
final long elapsedTimeSchema = (System.nanoTime() - startTime) / 1000000;
LOGGER.info("Elapsed time to schema {} seconds", elapsedTimeSchema / 1000.0);

Expand All @@ -78,35 +80,24 @@ public static Schema createSchema(
return generatedSchema;
}

private static Schema generateAvroSchema(final JdbcExportArgs args, final Connection connection)
private static Schema generateAvroSchema(
final JdbcExportArgs args,
final Connection connection,
final AvroSchemaMetadataProvider provider)
throws SQLException {
final String dbUrl = connection.getMetaData().getURL();
final String avroDoc =
args.avroDoc()
.orElseGet(() -> String.format("Generate schema from JDBC ResultSet from %s", dbUrl));
return JdbcAvroSchema.createSchemaByReadingOneRow(
connection,
args.queryBuilderArgs(),
args.avroSchemaNamespace(),
args.avroSchemaName(),
avroDoc,
args.useAvroLogicalTypes());
}

public static Optional<Schema> parseOptionalInputAvroSchemaFile(final String filename)
throws IOException {

if (filename == null || filename.isEmpty()) {
return Optional.empty();
}

return Optional.of(parseInputAvroSchemaFile(filename));
connection, args.queryBuilderArgs(), args.useAvroLogicalTypes(), provider);
}

public static Schema parseInputAvroSchemaFile(final String filename) throws IOException {
final MatchResult.Metadata m = FileSystems.matchSingleFileSpec(filename);
final InputStream inputStream = Channels.newInputStream(FileSystems.open(m.resourceId()));

return new Schema.Parser().parse(inputStream);
final Schema schema = new Schema.Parser().parse(inputStream);

LOGGER.info("Parsed the provided schema from: [{}]", filename);

return schema;
}

}
70 changes: 39 additions & 31 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.slf4j.Logger;
Expand All @@ -65,49 +64,43 @@ public class JdbcAvroSchema {
public static Schema createSchemaByReadingOneRow(
final Connection connection,
final QueryBuilderArgs queryBuilderArgs,
final String avroSchemaNamespace,
final Optional<String> schemaName,
final String avroDoc,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final AvroSchemaMetadataProvider provider)
throws SQLException {
LOGGER.debug("Creating Avro schema based on the first read row from the database");
try (Statement statement = connection.createStatement()) {
final ResultSet resultSet = statement.executeQuery(queryBuilderArgs.sqlQueryWithLimitOne());

final Schema schema =
createAvroSchema(
resultSet,
avroSchemaNamespace,
connection.getMetaData().getURL(),
schemaName,
avroDoc,
useLogicalTypes);
createAvroSchema(resultSet, connection.getMetaData().getURL(), useLogicalTypes, provider);
LOGGER.info("Schema created successfully. Generated schema: {}", schema.toString());
return schema;
}
}

public static Schema createAvroSchema(
final ResultSet resultSet,
final String avroSchemaNamespace,
final String connectionUrl,
final Optional<String> maybeSchemaName,
final String avroDoc,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final AvroSchemaMetadataProvider provider)
throws SQLException {

final ResultSetMetaData meta = resultSet.getMetaData();
final String tableName = getDatabaseTableName(meta);
final String schemaName = maybeSchemaName.orElse(tableName);
final String recordName = provider.avroSchemaName(tableName);
final String namespace = provider.avroSchemaNamespace();
final String recordDoc =
provider.avroDoc(
String.format("Generate schema from JDBC ResultSet from %s", connectionUrl));

final SchemaBuilder.FieldAssembler<Schema> builder =
SchemaBuilder.record(schemaName)
.namespace(avroSchemaNamespace)
.doc(avroDoc)
SchemaBuilder.record(recordName)
.namespace(namespace)
.doc(recordDoc)
.prop("tableName", tableName)
.prop("connectionUrl", connectionUrl)
.fields();
return createAvroFields(meta, builder, useLogicalTypes).endRecord();
return createAvroFields(meta, builder, useLogicalTypes, provider).endRecord();
}

static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException {
Expand All @@ -125,25 +118,26 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep
private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final ResultSetMetaData meta,
final SchemaBuilder.FieldAssembler<Schema> builder,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final AvroSchemaMetadataProvider provider)
throws SQLException {

final StringBuilder sqlMetadataLog = new StringBuilder("Sql ResultSet metadata: { ");

for (int i = 1; i <= meta.getColumnCount(); i++) {

final String columnName;
if (meta.getColumnName(i).isEmpty()) {
columnName = meta.getColumnLabel(i);
} else {
columnName = meta.getColumnName(i);
}

final String columnName = getColumnName(meta, i);
final int columnType = meta.getColumnType(i);
final String typeName = JDBCType.valueOf(columnType).getName();
final String columnClassName = meta.getColumnClassName(i);
final SchemaBuilder.FieldBuilder<Schema> field =
builder
.name(normalizeForAvro(columnName))
.doc(String.format("From sqlType %d %s (%s)", columnType, typeName, columnClassName))
.doc(
provider.getFieldDoc(
columnName,
String.format(
"From sqlType %d %s (%s)", columnType, typeName, columnClassName)))
.prop("columnName", columnName)
.prop("sqlCode", String.valueOf(columnType))
.prop("typeName", typeName)
Expand All @@ -162,10 +156,25 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
fieldSchemaBuilder);

schemaFieldAssembler.endUnion().nullDefault();

sqlMetadataLog.append(String.format("#[%d] name[%s] type[%s], ", i, columnName, typeName));
}

LOGGER.info(sqlMetadataLog.append(" }").toString());

return builder;
}

private static String getColumnName(ResultSetMetaData meta, int i) throws SQLException {
final String columnName;
if (meta.getColumnName(i).isEmpty()) {
columnName = meta.getColumnLabel(i);
} else {
columnName = meta.getColumnName(i);
}
return columnName;
}

/**
* Creates Avro field schema based on JDBC MetaData
*
Expand All @@ -176,7 +185,6 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
* <li>{@link com.mysql.cj.MysqlType }
* <li>{@link org.h2.value.TypeInfo }
* </ul>
*
*/
private static SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>
setAvroColumnType(
Expand Down
Loading