Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-create tables when batch insertion fails. #248

Merged
merged 13 commits into from
Feb 26, 2020
8 changes: 5 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ project.ext {
ioConfluentVersion = '5.3.1'
junitVersion = '4.12'
kafkaVersion = '2.3.0'
mockitoVersion = '1.10.19'
mockitoVersion = '3.2.4'
slf4jVersion = '1.6.1'
}

Expand Down Expand Up @@ -215,7 +215,8 @@ project(':kcbq-connector') {

testCompile (
"junit:junit:$junitVersion",
"org.mockito:mockito-core:$mockitoVersion"
"org.mockito:mockito-core:$mockitoVersion",
"org.mockito:mockito-inline:$mockitoVersion"
)
}

Expand Down Expand Up @@ -334,7 +335,8 @@ project('kcbq-confluent') {

testCompile (
"junit:junit:$junitVersion",
"org.mockito:mockito-core:$mockitoVersion"
"org.mockito:mockito-core:$mockitoVersion",
"org.mockito:mockito-inline:$mockitoVersion"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ private PartitionedTableId getRecordTable(SinkRecord record) {

TableId baseTableId = topicsToBaseTableIds.get(record.topic());

maybeCreateTable(record, baseTableId);

PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId);
if(usePartitionDecorator) {

Expand All @@ -161,20 +159,6 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
return builder.build();
}

/**
* Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true.
* @param record Kafka Sink Record to be streamed into BigQuery.
* @param baseTableId BaseTableId in BigQuery.
*/
private void maybeCreateTable(SinkRecord record, TableId baseTableId) {
BigQuery bigQuery = getBigQuery();
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
if (autoCreateTables && bigQuery.getTable(baseTableId) == null) {
getSchemaManager(bigQuery).createTable(baseTableId, record.topic());
logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic());
}
}

private RowToInsert getRecordRow(SinkRecord record) {
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
Expand Down Expand Up @@ -217,7 +201,8 @@ public void put(Collection<SinkRecord> records) {
if (!tableWriterBuilders.containsKey(table)) {
TableWriterBuilder tableWriterBuilder;
if (config.getList(config.ENABLE_BATCH_CONFIG).contains(record.topic())) {
String gcsBlobName = record.topic() + "_" + uuid + "_" + Instant.now().toEpochMilli();
String topic = record.topic();
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli();
String gcsFolderName = config.getString(config.GCS_FOLDER_NAME_CONFIG);
if (gcsFolderName != null && !"".equals(gcsFolderName)) {
gcsBlobName = gcsFolderName + "/" + gcsBlobName;
Expand All @@ -227,6 +212,7 @@ public void put(Collection<SinkRecord> records) {
table.getBaseTableId(),
config.getString(config.GCS_BUCKET_NAME_CONFIG),
gcsBlobName,
topic,
recordConverter);
} else {
tableWriterBuilder =
Expand Down Expand Up @@ -283,15 +269,18 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) {
}

private BigQueryWriter getBigQueryWriter() {
boolean updateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG);
boolean autoUpdateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG);
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG);
long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG);
BigQuery bigQuery = getBigQuery();
if (updateSchemas) {
if (autoUpdateSchemas || autoCreateTables) {
return new AdaptiveBigQueryWriter(bigQuery,
getSchemaManager(bigQuery),
retry,
retryWait);
retryWait,
autoUpdateSchemas,
autoCreateTables);
} else {
return new SimpleBigQueryWriter(bigQuery, retry, retryWait);
}
Expand All @@ -312,10 +301,13 @@ private GCSToBQWriter getGcsWriter() {
BigQuery bigQuery = getBigQuery();
int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG);
long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG);
boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG);
return new GCSToBQWriter(getGcs(),
bigQuery,
getSchemaManager(bigQuery),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bingqinzhou @mtagle @wicknicks Seems like now there is no way to use this connector without specifying schema registry.
getGcsWriter() is being called at start of the connector and which expects schema registry details. I am using the connector for json data which doesn't require schema registry and connector is throwing exception.

org.apache.kafka.common.config.ConfigException: Cannot request new instance of SchemaRetriever when class has not been specified
	at com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getSchemaRetriever(BigQuerySinkConfig.java:555)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getSchemaManager(BigQuerySinkTask.java:263)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.getGcsWriter(BigQuerySinkTask.java:309)
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.start(BigQuerySinkTask.java:330)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2020-03-02 20:04:16,203] ERROR WorkerSinkTask{id=bigquery-connector-9} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-03-02 20:04:16,203] WARN Could not stop task (org.apache.kafka.connect.runtime.WorkerSinkTask:160)
java.lang.NullPointerException
	at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.stop(BigQuerySinkTask.java:366)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:158)
	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

retry,
retryWait);
retryWait,
autoCreateTables);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Batch Table Writer that uploads records to GCS as a blob
Expand All @@ -45,6 +44,7 @@ public class GCSBatchTableWriter implements Runnable {

private final String bucketName;
private final String blobName;
private final String topic;

private final List<RowToInsert> rows;
private final GCSToBQWriter writer;
Expand All @@ -56,15 +56,18 @@ public class GCSBatchTableWriter implements Runnable {
* @param bucketName the name of the GCS bucket where the blob should be uploaded
* @param baseBlobName the base name of the blob in which the serialized rows should be uploaded.
* The full name is [baseBlobName]_[writerId]_
* @param topic Kafka record topic
*/
private GCSBatchTableWriter(List<RowToInsert> rows,
GCSToBQWriter writer,
TableId tableId,
String bucketName,
String baseBlobName) {
String baseBlobName,
String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add doc to topic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc~

this.tableId = tableId;
this.bucketName = bucketName;
this.blobName = baseBlobName;
this.topic = topic;

this.rows = rows;
this.writer = writer;
Expand All @@ -73,7 +76,7 @@ private GCSBatchTableWriter(List<RowToInsert> rows,
@Override
public void run() {
try {
writer.writeRows(rows, tableId, bucketName, blobName);
writer.writeRows(rows, tableId, bucketName, blobName, topic);
} catch (ConnectException ex) {
throw new ConnectException("Failed to write rows to GCS", ex);
} catch (InterruptedException ex) {
Expand All @@ -87,6 +90,7 @@ public void run() {
public static class Builder implements TableWriterBuilder {
private final String bucketName;
private String blobName;
private String topic;

private final TableId tableId;

Expand All @@ -101,16 +105,19 @@ public static class Builder implements TableWriterBuilder {
* @param tableId The bigquery table to be written to.
* @param gcsBucketName The GCS bucket to write to.
* @param gcsBlobName The name of the GCS blob to write.
* @param topic Kafka record topic
* @param recordConverter the {@link RecordConverter} to use.
*/
public Builder(GCSToBQWriter writer,
TableId tableId,
String gcsBucketName,
String gcsBlobName,
String topic,
RecordConverter<Map<String, Object>> recordConverter) {

this.bucketName = gcsBucketName;
this.blobName = gcsBlobName;
this.topic = topic;

this.tableId = tableId;

Expand All @@ -133,7 +140,7 @@ public void addRow(RowToInsert rowToInsert) {
}

public GCSBatchTableWriter build() {
return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName);
return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName, topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.InsertAllRequest;
Expand All @@ -37,16 +38,20 @@
import java.util.Map;

/**
* A {@link BigQueryWriter} capable of updating BigQuery table schemas.
* A {@link BigQueryWriter} capable of updating BigQuery table schemas and creating non-existed tables automatically.
*/
public class AdaptiveBigQueryWriter extends BigQueryWriter {
private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class);

// The maximum number of retries we will attempt to write rows after updating a BQ table schema.
private static final int AFTER_UPDATE_RETY_LIMIT = 5;
// The maximum number of retries we will attempt to write rows after creating a table or updating a BQ table schema.
private static final int RETRY_LIMIT = 5;
// Wait for about 30s between each retry since both creating table and updating schema take up to 2~3 minutes to take effect.
private static final int RETRY_WAIT_TIME = 30000;

private final BigQuery bigQuery;
private final SchemaManager schemaManager;
private final boolean autoUpdateSchemas;
private final boolean autoCreateTables;

/**
* @param bigQuery Used to send write requests to BigQuery.
Expand All @@ -57,10 +62,14 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter {
public AdaptiveBigQueryWriter(BigQuery bigQuery,
SchemaManager schemaManager,
int retry,
long retryWait) {
long retryWait,
boolean autoUpdateSchemas,
boolean autoCreateTables) {
super(retry, retryWait);
this.bigQuery = bigQuery;
this.schemaManager = schemaManager;
this.autoUpdateSchemas = autoUpdateSchemas;
this.autoCreateTables = autoCreateTables;
}

private boolean isTableMissingSchema(BigQueryException exception) {
Expand All @@ -69,6 +78,12 @@ private boolean isTableMissingSchema(BigQueryException exception) {
return exception.getReason() != null && exception.getReason().equalsIgnoreCase("invalid");
}

private boolean isTableNotExistedException(BigQueryException exception) {
// If a table does not exist, it will raise a BigQueryException that the input is notFound
// Referring to Google Cloud Error Codes Doc: https://cloud.google.com/bigquery/docs/error-messages?hl=en
return exception.getCode() == 404;
}

/**
* Sends the request to BigQuery, then checks the response to see if any errors have occurred. If
* any have, and all errors can be blamed upon invalid columns in the rows sent, attempts to
Expand All @@ -86,21 +101,24 @@ public Map<Long, List<BigQueryError>> performWriteRequest(
try {
request = createInsertAllRequest(tableId, rows);
writeResponse = bigQuery.insertAll(request);
// Should only perform one schema update attempt; may have to continue insert attempts due to
// BigQuery schema updates taking up to two minutes to take effect
// Should only perform one schema update attempt.
if (writeResponse.hasErrors()
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) {
&& onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdateSchemas) {
attemptSchemaUpdate(tableId, topic);
}
} catch (BigQueryException exception) {
if (isTableMissingSchema(exception)) {
// Should only perform one table creation attempt.
if (isTableNotExistedException(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) {
attemptTableCreate(tableId.getBaseTableId(), topic);
} else if (isTableMissingSchema(exception) && autoUpdateSchemas) {
attemptSchemaUpdate(tableId, topic);
} else {
throw exception;
}
}

// Schema update might be delayed, so multiple insertion attempts may be necessary
// Creating tables or updating table schemas in BigQuery takes up to 2~3 minutes to take affect,
// so multiple insertion attempts may be necessary.
int attemptCount = 0;
while (writeResponse == null || writeResponse.hasErrors()) {
logger.trace("insertion failed");
Expand All @@ -117,10 +135,15 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) {
return writeResponse.getInsertErrors();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Schema update might be delayed, so multiple insertion attempts may be necessary
Doc needs update (insert update also might be delay ...)

Copy link
Contributor Author

@bingqinzhou bingqinzhou Feb 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc accordingly~

attemptCount++;
if (attemptCount >= AFTER_UPDATE_RETY_LIMIT) {
if (attemptCount >= RETRY_LIMIT) {
throw new BigQueryConnectException(
"Failed to write rows after BQ schema update within "
+ AFTER_UPDATE_RETY_LIMIT + " attempts for: " + tableId.getBaseTableId());
+ RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId());
}
try {
Thread.sleep(RETRY_WAIT_TIME);
Copy link
Contributor

@whynick1 whynick1 Feb 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If retry is caused by schema update, we don't need sleep(). I am thinking maybe we can refactor the code to

try {
  bigQuery.insertAll(request);
} catch (BigQueryException exception) {
  if (isTableNotExisted) {
    attemptTableCreate(tableId, topic);
    retryInsert(request, maxRetry, INFINIT, 30)
  } else if (isTableMissingSchema) {
    attemptSchemaUpdate(tableId, topic);
    retryInsert(request, maxRetry, 5, 0)
  } else {
    throw exception;
  }
}

public retryInsert(request, maxRetry, waitTime)) {
  while (count < maxRetry) {
    try {
      bigQuery.insertAll(request);
      return;
    } catch (BigQueryException) {
      count++;
      sleep(waitTime);
      logger.info("I am sleeping")
    }
  }
}

Copy link
Contributor Author

@bingqinzhou bingqinzhou Feb 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the comment here https://github.com/wepay/kafka-connect-bigquery/blob/master/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java#L90-L91, updating schema in bigquery is also an async process which takes up to 2 min. I think we'll want to wait for some time (like 30s) for each retry for updating schema as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would suggest add more comment in the function or clean up a little bit. It is not very easy to read to people unfamiliar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up the existed comments to my best effort.

} catch (InterruptedException e) {
// no-op, we want to keep retrying the insert
}
}
logger.debug("table insertion completed successfully");
Expand All @@ -136,6 +159,16 @@ private void attemptSchemaUpdate(PartitionedTableId tableId, String topic) {
}
}

private void attemptTableCreate(TableId tableId, String topic) {
try {
schemaManager.createTable(tableId, topic);
logger.info("Table {} does not exist, auto-created table for topic {}", tableId, topic);
} catch (BigQueryException exception) {
throw new BigQueryConnectException(
"Failed to create table " + tableId, exception);
}
}

/*
* Currently, the only way to determine the cause of an insert all failure is by examining the map
* object returned by the insertErrors() method of an insert all response. The only way to
Expand Down
Loading