Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-7246: add ability to partition based on timestamp of a record value field #246

Merged
merged 5 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,6 @@ private BigQuery getBigQuery() {
return new BigQueryHelper().setKeySource(keySource).connect(projectName, key);
}

private SchemaManager getSchemaManager(BigQuery bigQuery) {
if (testSchemaManager != null) {
return testSchemaManager;
}
SchemaRetriever schemaRetriever = config.getSchemaRetriever();
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter = config.getSchemaConverter();
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
}

private void ensureExistingTables() {
BigQuery bigQuery = getBigQuery();
Map<String, TableId> topicsToTableIds = TopicToTableResolver.getTopicsToTables(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
config.getSchemaConverter();
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
Optional<String> kafkaDataFieldName = config.getKafkaDataFieldName();
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName, kafkaDataFieldName);
Optional<String> timestampPartitionFieldName = config.getTimestampPartitionFieldName();
return new SchemaManager(schemaRetriever, schemaConverter, bigQuery, kafkaKeyFieldName,
kafkaDataFieldName, timestampPartitionFieldName);
dosvath marked this conversation as resolved.
Show resolved Hide resolved
}

private BigQueryWriter getBigQueryWriter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.TimePartitioning.Type;
import com.wepay.kafka.connect.bigquery.api.KafkaSchemaRecordType;
import com.wepay.kafka.connect.bigquery.api.SchemaRetriever;
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
Expand All @@ -32,6 +33,7 @@ public class SchemaManager {
private final BigQuery bigQuery;
private final Optional<String> kafkaKeyFieldName;
private final Optional<String> kafkaDataFieldName;
private final Optional<String> timestampPartitionFieldName;

/**
* @param schemaRetriever Used to determine the Kafka Connect Schema that should be used for a
Expand All @@ -48,12 +50,14 @@ public SchemaManager(
SchemaConverter<com.google.cloud.bigquery.Schema> schemaConverter,
BigQuery bigQuery,
Optional<String> kafkaKeyFieldName,
Optional<String> kafkaDataFieldName) {
Optional<String> kafkaDataFieldName,
Optional<String> timestampPartitionFieldName) {
this.schemaRetriever = schemaRetriever;
this.schemaConverter = schemaConverter;
this.bigQuery = bigQuery;
this.kafkaKeyFieldName = kafkaKeyFieldName;
this.kafkaDataFieldName = kafkaDataFieldName;
this.timestampPartitionFieldName = timestampPartitionFieldName;
}

/**
Expand Down Expand Up @@ -84,9 +88,15 @@ public void updateSchema(TableId table, String topic) {
// package private for testing.
TableInfo constructTableInfo(TableId table, Schema kafkaKeySchema, Schema kafkaValueSchema) {
com.google.cloud.bigquery.Schema bigQuerySchema = getBigQuerySchema(kafkaKeySchema, kafkaValueSchema);

TimePartitioning timePartitioning = TimePartitioning.of(Type.DAY);
if (timestampPartitionFieldName.isPresent()){
timePartitioning = timePartitioning.toBuilder().setField(timestampPartitionFieldName.get()).build();
}

StandardTableDefinition tableDefinition = StandardTableDefinition.newBuilder()
.setSchema(bigQuerySchema)
.setTimePartitioning(TimePartitioning.of(TimePartitioning.Type.DAY))
.setTimePartitioning(timePartitioning)
.build();
TableInfo.Builder tableInfoBuilder =
TableInfo.newBuilder(table, tableDefinition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/


import java.util.Optional;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

Expand Down Expand Up @@ -107,6 +108,16 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
+ "Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). "
+ "Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.";

public static final String BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG = "timestampPartitionFieldName";
private static final ConfigDef.Type BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_TYPE = ConfigDef.Type.STRING;
private static final String BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DEFAULT = null;
private static final ConfigDef.Importance BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_IMPORTANCE =
ConfigDef.Importance.LOW;
private static final String BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DOC =
"The name of the field in the value that contains the timestamp to partition by in BigQuery"
+ " and enable timestamp partitioning for each table. Leave this configuration blank,"
+ " to enable ingestion time partitioning for each table.";

static {
config = BigQuerySinkConfig.getConfig()
.define(
Expand Down Expand Up @@ -155,6 +166,12 @@ public class BigQuerySinkTaskConfig extends BigQuerySinkConfig {
BIGQUERY_PARTITION_DECORATOR_DEFAULT,
BIGQUERY_PARTITION_DECORATOR_IMPORTANCE,
BIGQUERY_PARTITION_DECORATOR_DOC
).define(
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG,
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_TYPE,
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DEFAULT,
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_IMPORTANCE,
BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_DOC
);
}

Expand All @@ -175,6 +192,26 @@ private void checkAutoUpdateSchemas() {
}
}

/**
* Returns the field name to use for timestamp partitioning.
* @return String that represents the field name.
*/
public Optional<String> getTimestampPartitionFieldName() {
return Optional.ofNullable(getString(BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG));
}

/**
* Check the validity of table partitioning configs.
*/
private void checkPartitionCofigs() {
if (getTimestampPartitionFieldName().isPresent() && getBoolean(BIGQUERY_PARTITION_DECORATOR_CONFIG)){
throw new ConfigException(
"Only one partitioning configuration mode may be specified for the connector. "
+ "Use either bigQueryPartitionDecorator OR timestampPartitionFieldName."
);
}
}

public static ConfigDef getConfig() {
return config;
}
Expand All @@ -185,5 +222,6 @@ public static ConfigDef getConfig() {
public BigQuerySinkTaskConfig(Map<String, String> properties) {
super(config, properties);
checkAutoUpdateSchemas();
checkPartitionCofigs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;

Expand All @@ -33,45 +34,71 @@
import org.apache.kafka.connect.data.Schema;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Optional;

public class SchemaManagerTest {

@Test
public void testBQTableDescription() {
final String testTableName = "testTable";
final String testDatasetName = "testDataset";
final String testDoc = "test doc";
final TableId tableId = TableId.of(testDatasetName, testTableName);

SchemaRetriever mockSchemaRetriever = mock(SchemaRetriever.class);
@SuppressWarnings("unchecked")
SchemaConverter<com.google.cloud.bigquery.Schema> mockSchemaConverter =
private String testTableName = "testTable";
private String testDatasetName = "testDataset";
private String testDoc = "test doc";
private TableId tableId = TableId.of(testDatasetName, testTableName);

private SchemaRetriever mockSchemaRetriever;
private SchemaConverter<com.google.cloud.bigquery.Schema> mockSchemaConverter;
private BigQuery mockBigQuery;
private Schema mockKafkaSchema;
private com.google.cloud.bigquery.Schema fakeBigQuerySchema;

@Before
public void before() {
mockSchemaRetriever = mock(SchemaRetriever.class);
mockSchemaConverter =
(SchemaConverter<com.google.cloud.bigquery.Schema>) mock(SchemaConverter.class);
BigQuery mockBigQuery = mock(BigQuery.class);
mockBigQuery = mock(BigQuery.class);
mockKafkaSchema = mock(Schema.class);
fakeBigQuerySchema = com.google.cloud.bigquery.Schema.of(
Field.of("mock field", LegacySQLTypeName.STRING));
}

@Test
public void testBQTableDescription() {
Optional<String> kafkaKeyFieldName = Optional.of("kafkaKey");
Optional<String> kafkaDataFieldName = Optional.of("kafkaData");
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever, mockSchemaConverter,
mockBigQuery, kafkaKeyFieldName, kafkaDataFieldName, Optional.empty());

SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever,
mockSchemaConverter,
mockBigQuery,
kafkaKeyFieldName,
kafkaDataFieldName);
when(mockSchemaConverter.convertSchema(mockKafkaSchema)).thenReturn(fakeBigQuerySchema);
when(mockKafkaSchema.doc()).thenReturn(testDoc);

Schema mockKafkaSchema = mock(Schema.class);
// we would prefer to mock this class, but it is final.
com.google.cloud.bigquery.Schema fakeBigQuerySchema =
com.google.cloud.bigquery.Schema.of(Field.of("mock field", LegacySQLTypeName.STRING));
TableInfo tableInfo = schemaManager
.constructTableInfo(tableId, mockKafkaSchema, mockKafkaSchema);

Assert.assertEquals("Kafka doc does not match BigQuery table description",
testDoc, tableInfo.getDescription());
Assert.assertNull("Timestamp partition field name is not null",
((StandardTableDefinition) tableInfo.getDefinition()).getTimePartitioning().getField());
}

@Test
public void testTimestampPartitionSet() {
Optional<String> testField = Optional.of("testField");
SchemaManager schemaManager = new SchemaManager(mockSchemaRetriever, mockSchemaConverter,
mockBigQuery, Optional.empty(), Optional.empty(), testField);

when(mockSchemaConverter.convertSchema(mockKafkaSchema)).thenReturn(fakeBigQuerySchema);
when(mockKafkaSchema.doc()).thenReturn(testDoc);

TableInfo tableInfo = schemaManager.constructTableInfo(tableId, mockKafkaSchema, mockKafkaSchema);
TableInfo tableInfo = schemaManager
.constructTableInfo(tableId, mockKafkaSchema, mockKafkaSchema);

Assert.assertEquals("Kafka doc does not match BigQuery table description",
testDoc, tableInfo.getDescription());
testDoc, tableInfo.getDescription());
Assert.assertEquals("The field name does not match the field name of time partition",
testField.get(),
((StandardTableDefinition) tableInfo.getDefinition()).getTimePartitioning().getField());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/


import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.wepay.kafka.connect.bigquery.SinkTaskPropertiesFactory;
Expand Down Expand Up @@ -62,6 +64,41 @@ public void testMaxWriteSize() {
*/
}

/**
* Test the the default for the field name is not present.
dosvath marked this conversation as resolved.
Show resolved Hide resolved
*/
@Test
public void testEmptyTimestampPartitionFieldName() {
Map<String, String> configProperties = propertiesFactory.getProperties();
BigQuerySinkTaskConfig testConfig = new BigQuerySinkTaskConfig(configProperties);
assertFalse(testConfig.getTimestampPartitionFieldName().isPresent());
}

/**
* Test if the field name being non-empty and the decorator default (true) errors correctly.
*/
@Test (expected = ConfigException.class)
public void testTimestampPartitionFieldNameError() {
Map<String, String> configProperties = propertiesFactory.getProperties();
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG, "name");
new BigQuerySinkTaskConfig(configProperties);
}

/**
* Test the field name being non-empty and the decorator set to false works correctly.
*/
@Test
public void testTimestampPartitionFieldName() {
Map<String, String> configProperties = propertiesFactory.getProperties();
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_TIMESTAMP_PARTITION_FIELD_NAME_CONFIG, "name");
configProperties.put(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG, "false");
BigQuerySinkTaskConfig testConfig = new BigQuerySinkTaskConfig(configProperties);
assertTrue(testConfig.getTimestampPartitionFieldName().isPresent());
assertFalse(testConfig.getBoolean(BigQuerySinkTaskConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG));
}

dosvath marked this conversation as resolved.
Show resolved Hide resolved


@Test(expected = ConfigException.class)
public void testAutoSchemaUpdateWithoutRetriever() {
Map<String, String> badConfigProperties = propertiesFactory.getProperties();
Expand Down