Skip to content

Commit

Permalink
Add initial supplied schema validation to prevent inconsistency betwe…
Browse files Browse the repository at this point in the history
…en supplied schema (expected data format) and an actual data format, returned by a SQL query.

Reorganize some code to make locations more logical.

Always use generated Avro schema.
Optional user provided schema used for `doc` fields retrieval.
  • Loading branch information
Ruslan Altynnikov committed Feb 11, 2022
1 parent 2315e22 commit bcd97b7
Show file tree
Hide file tree
Showing 14 changed files with 485 additions and 128 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ com.spotify.dbeam.options.JdbcExportPipelineOptions:
--avroSchemaFilePath=<String>
Path to file with a target AVRO schema.
--avroSchemaName=<String>
The name of the generated avro schema. By default it uses the table name.
The name of the generated avro schema. By default the table name is used.
--avroSchemaNamespace=<String>
Default: dbeam_generated
The namespace of the generated avro schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.sql.Connection;
import java.time.Duration;
import java.util.Optional;
import org.apache.avro.Schema;

@AutoValue
public abstract class JdbcExportArgs implements Serializable {
Expand All @@ -47,8 +46,6 @@ public abstract class JdbcExportArgs implements Serializable {

public abstract Duration exportTimeout();

public abstract Optional<Schema> inputAvroSchema();

@AutoValue.Builder
abstract static class Builder {

Expand All @@ -66,8 +63,6 @@ abstract static class Builder {

abstract Builder setExportTimeout(Duration exportTimeout);

abstract Builder setInputAvroSchema(Optional<Schema> inputAvroSchema);

abstract JdbcExportArgs build();
}

Expand All @@ -81,8 +76,7 @@ static JdbcExportArgs create(
Optional.empty(),
Optional.empty(),
false,
Duration.ofDays(7),
Optional.empty());
Duration.ofDays(7));
}

public static JdbcExportArgs create(
Expand All @@ -92,8 +86,7 @@ public static JdbcExportArgs create(
final Optional<String> avroSchemaName,
final Optional<String> avroDoc,
final Boolean useAvroLogicalTypes,
final Duration exportTimeout,
final Optional<Schema> inputAvroSchema) {
final Duration exportTimeout) {
return new AutoValue_JdbcExportArgs.Builder()
.setJdbcAvroOptions(jdbcAvroArgs)
.setQueryBuilderArgs(queryBuilderArgs)
Expand All @@ -102,7 +95,6 @@ public static JdbcExportArgs create(
.setAvroDoc(avroDoc)
.setUseAvroLogicalTypes(useAvroLogicalTypes)
.setExportTimeout(exportTimeout)
.setInputAvroSchema(inputAvroSchema)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*-
* -\-\-
* DBeam Core
* --
* Copyright (C) 2016 - 2018 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.dbeam.avro;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroSchemaMetadataProvider {

private static Logger LOGGER = LoggerFactory.getLogger(AvroSchemaMetadataProvider.class);

// provided schema has a priority over single arguments' values.
private final Schema provided;
private final String avroSchemaName;
private final String avroSchemaNamespace;
private final String avroDoc;

public AvroSchemaMetadataProvider(
final String avroSchemaName, final String avroSchemaNamespace, final String avroDoc) {
this(null, avroSchemaName, avroSchemaNamespace, avroDoc);
}

public AvroSchemaMetadataProvider(
final Schema provided,
final String avroSchemaName,
final String avroSchemaNamespace,
final String avroDoc) {
this.provided = provided;
this.avroSchemaName = avroSchemaName;
this.avroSchemaNamespace = avroSchemaNamespace;
this.avroDoc = avroDoc;
}

public String avroDoc(final String defaultVal) {
return (provided != null) ? provided.getDoc() : (avroDoc != null) ? avroDoc : defaultVal;
}

public String avroSchemaName(final String defaultVal) {
if (provided != null) {
String name = provided.getName();
return (name != null) ? name : defaultVal;
} else {
return avroSchemaName != null ? avroSchemaName : defaultVal;
}
}

public String avroSchemaNamespace() {
return (provided != null) ? provided.getNamespace() : avroSchemaNamespace;
}

public String getFieldDoc(final String fieldName, final String defaultVal) {
if (provided != null) {
final Schema.Field field = provided.getField(fieldName);
if (field != null) {
return field.doc();
} else {
LOGGER.warn("Field [{}] not found in a provided schema", fieldName);
return defaultVal;
}
} else {
return defaultVal;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
Expand All @@ -42,7 +41,7 @@

public class BeamJdbcAvroSchema {

private static Logger LOGGER = LoggerFactory.getLogger(BeamJdbcAvroSchema.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BeamJdbcAvroSchema.class);

/**
* Generate Avro schema by reading one row. Expose Beam metrics via a Beam PTransform.
Expand All @@ -54,10 +53,13 @@ public class BeamJdbcAvroSchema {
* @throws Exception in case of failure to query database
*/
public static Schema createSchema(
final Pipeline pipeline, final JdbcExportArgs args, final Connection connection)
final Pipeline pipeline,
final JdbcExportArgs args,
final Connection connection,
final AvroSchemaMetadataProvider provider)
throws Exception {
final long startTime = System.nanoTime();
final Schema generatedSchema = generateAvroSchema(args, connection);
final Schema generatedSchema = generateAvroSchema(args, connection, provider);
final long elapsedTimeSchema = (System.nanoTime() - startTime) / 1000000;
LOGGER.info("Elapsed time to schema {} seconds", elapsedTimeSchema / 1000.0);

Expand All @@ -78,35 +80,24 @@ public static Schema createSchema(
return generatedSchema;
}

private static Schema generateAvroSchema(final JdbcExportArgs args, final Connection connection)
private static Schema generateAvroSchema(
final JdbcExportArgs args,
final Connection connection,
final AvroSchemaMetadataProvider provider)
throws SQLException {
final String dbUrl = connection.getMetaData().getURL();
final String avroDoc =
args.avroDoc()
.orElseGet(() -> String.format("Generate schema from JDBC ResultSet from %s", dbUrl));
return JdbcAvroSchema.createSchemaByReadingOneRow(
connection,
args.queryBuilderArgs(),
args.avroSchemaNamespace(),
args.avroSchemaName(),
avroDoc,
args.useAvroLogicalTypes());
}

public static Optional<Schema> parseOptionalInputAvroSchemaFile(final String filename)
throws IOException {

if (filename == null || filename.isEmpty()) {
return Optional.empty();
}

return Optional.of(parseInputAvroSchemaFile(filename));
connection, args.queryBuilderArgs(), args.useAvroLogicalTypes(), provider);
}

public static Schema parseInputAvroSchemaFile(final String filename) throws IOException {
final MatchResult.Metadata m = FileSystems.matchSingleFileSpec(filename);
final InputStream inputStream = Channels.newInputStream(FileSystems.open(m.resourceId()));

return new Schema.Parser().parse(inputStream);
final Schema schema = new Schema.Parser().parse(inputStream);

LOGGER.info("Parsed the provided schema from: [{}]", filename);

return schema;
}

}
42 changes: 19 additions & 23 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.slf4j.Logger;
Expand All @@ -65,49 +64,43 @@ public class JdbcAvroSchema {
public static Schema createSchemaByReadingOneRow(
final Connection connection,
final QueryBuilderArgs queryBuilderArgs,
final String avroSchemaNamespace,
final Optional<String> schemaName,
final String avroDoc,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final AvroSchemaMetadataProvider provider)
throws SQLException {
LOGGER.debug("Creating Avro schema based on the first read row from the database");
try (Statement statement = connection.createStatement()) {
final ResultSet resultSet = statement.executeQuery(queryBuilderArgs.sqlQueryWithLimitOne());

final Schema schema =
createAvroSchema(
resultSet,
avroSchemaNamespace,
connection.getMetaData().getURL(),
schemaName,
avroDoc,
useLogicalTypes);
createAvroSchema(resultSet, connection.getMetaData().getURL(), useLogicalTypes, provider);
LOGGER.info("Schema created successfully. Generated schema: {}", schema.toString());
return schema;
}
}

public static Schema createAvroSchema(
final ResultSet resultSet,
final String avroSchemaNamespace,
final String connectionUrl,
final Optional<String> maybeSchemaName,
final String avroDoc,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final AvroSchemaMetadataProvider provider)
throws SQLException {

final ResultSetMetaData meta = resultSet.getMetaData();
final String tableName = getDatabaseTableName(meta);
final String schemaName = maybeSchemaName.orElse(tableName);
final String recordName = provider.avroSchemaName(tableName);
final String namespace = provider.avroSchemaNamespace();
final String recordDoc =
provider.avroDoc(
String.format("Generate schema from JDBC ResultSet from %s", connectionUrl));

final SchemaBuilder.FieldAssembler<Schema> builder =
SchemaBuilder.record(schemaName)
.namespace(avroSchemaNamespace)
.doc(avroDoc)
SchemaBuilder.record(recordName)
.namespace(namespace)
.doc(recordDoc)
.prop("tableName", tableName)
.prop("connectionUrl", connectionUrl)
.fields();
return createAvroFields(meta, builder, useLogicalTypes).endRecord();
return createAvroFields(meta, builder, useLogicalTypes, provider).endRecord();
}

static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException {
Expand All @@ -125,7 +118,8 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep
private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final ResultSetMetaData meta,
final SchemaBuilder.FieldAssembler<Schema> builder,
final boolean useLogicalTypes)
final boolean useLogicalTypes,
final AvroSchemaMetadataProvider provider)
throws SQLException {

for (int i = 1; i <= meta.getColumnCount(); i++) {
Expand All @@ -142,7 +136,9 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final SchemaBuilder.FieldBuilder<Schema> field =
builder
.name(normalizeForAvro(columnName))
.doc(String.format("From sqlType %d %s", columnType, typeName))
.doc(
provider.getFieldDoc(
columnName, String.format("From sqlType %d %s", columnType, typeName)))
.prop("columnName", columnName)
.prop("sqlCode", String.valueOf(columnType))
.prop("typeName", typeName);
Expand Down
29 changes: 24 additions & 5 deletions dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Preconditions;
import com.spotify.dbeam.args.JdbcExportArgs;
import com.spotify.dbeam.avro.AvroSchemaMetadataProvider;
import com.spotify.dbeam.avro.BeamJdbcAvroSchema;
import com.spotify.dbeam.avro.JdbcAvroIO;
import com.spotify.dbeam.avro.JdbcAvroMetering;
Expand Down Expand Up @@ -143,12 +144,30 @@ public void prepareExport() throws Exception {
output, ".avro", generatedSchema, jdbcExportArgs.jdbcAvroOptions()));
}

private Schema createSchema(final Connection connection) throws Exception {
if (this.jdbcExportArgs.inputAvroSchema().isPresent()) {
return this.jdbcExportArgs.inputAvroSchema().get();
} else {
return BeamJdbcAvroSchema.createSchema(this.pipeline, jdbcExportArgs, connection);
Schema createSchema(final Connection connection) throws Exception {

AvroSchemaMetadataProvider metadataProvider =
new AvroSchemaMetadataProvider(
getProvidedSchema(),
jdbcExportArgs.avroSchemaName().orElse(null),
jdbcExportArgs.avroSchemaNamespace(),
jdbcExportArgs.avroDoc().orElse(null));

Schema generatedSchema =
BeamJdbcAvroSchema.createSchema(
this.pipeline, jdbcExportArgs, connection, metadataProvider);

return generatedSchema;
}

Schema getProvidedSchema() throws IOException {
String avroSchemaFilePath =
pipelineOptions.as(JdbcExportPipelineOptions.class).getAvroSchemaFilePath();
if (avroSchemaFilePath != null && !avroSchemaFilePath.isEmpty()) {
return BeamJdbcAvroSchema.parseInputAvroSchemaFile(avroSchemaFilePath);
}

return null;
}

public Pipeline getPipeline() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.spotify.dbeam.args.JdbcConnectionArgs;
import com.spotify.dbeam.args.JdbcExportArgs;
import com.spotify.dbeam.args.QueryBuilderArgs;
import com.spotify.dbeam.avro.BeamJdbcAvroSchema;
import com.spotify.dbeam.beam.BeamHelper;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -77,8 +76,7 @@ public static JdbcExportArgs fromPipelineOptions(final PipelineOptions options)
Optional.ofNullable(exportOptions.getAvroSchemaName()),
Optional.ofNullable(exportOptions.getAvroDoc()),
exportOptions.isUseAvroLogicalTypes(),
Duration.parse(exportOptions.getExportTimeout()),
BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(exportOptions.getAvroSchemaFilePath()));
Duration.parse(exportOptions.getExportTimeout()));
}

public static QueryBuilderArgs createQueryArgs(final JdbcExportPipelineOptions options)
Expand Down
Loading

0 comments on commit bcd97b7

Please sign in to comment.