-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CC-7246: add ability to partition based on timestamp of a record value field #214
Conversation
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
@wicknicks @gharris1727 @aakashnshah any reviews would be appreciated |
Signed-off-by: Lev Zemlyanov <lev@confluent.io>
kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/SchemaManager.java
Show resolved
Hide resolved
kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java
Show resolved
Hide resolved
@@ -155,6 +155,12 @@ private RowToInsert getRecordRow(SinkRecord record) { | |||
convertedRecord = FieldNameSanitizer.replaceInvalidKeys(convertedRecord); | |||
} | |||
|
|||
if (config.useTimestampPartitioning()) { | |||
if (!convertedRecord.containsKey(config.getTimestampPartitionFieldName())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When/how would this happen? Looks like the first if statement accounts for the field name being non empty. Of course this doesn't ensure the record containing the field, but still wanted to know this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means that the record doesn't contain the field - RecordConverter
returns a map of all the field names to their values, so that's how I check the struct
kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java
Show resolved
Hide resolved
kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java
Show resolved
Hide resolved
kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java
Show resolved
Hide resolved
final String testTableName = "testTable"; | ||
final String testDatasetName = "testDataset"; | ||
final String testDoc = "test doc"; | ||
final TableId tableId = TableId.of(testDatasetName, testTableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you not put these outside of the function since you use these multiple times?
kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SchemaManagerTest.java
Show resolved
Hide resolved
com.google.cloud.bigquery.Schema fakeBigQuerySchema = | ||
com.google.cloud.bigquery.Schema.of(Field.of("mock field", LegacySQLTypeName.STRING)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use the mockito inline package :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following the original tests, want to make this a non-invasive addition
@mtagle would love some eyes on this :) |
I tried this and I was getting an error 'Streaming to metadata partition of column based partitioning table $20191127 is disallowed.' It looks like the reason is that, with column-based partitions, you shouldn't supply the partition explicitly. You just supply the table name and BigQuery sorts the partitioning itself.When I updated the It seems to create the table fine, though. Source: https://stackoverflow.com/a/50006560 |
Actually, this PR does what I explained: #203 |
superseded by #246 thus closing |
implements this behavior:
https://cloud.google.com/bigquery/docs/partitioned-tables#date_timestamp_partitioned_tables
TLDR: BigQuery can partition based off a column that contains a timestamp, so by just passing a field in the struct to BigQuery, it will specify which column to partition by.
Signed-off-by: Lev Zemlyanov lev@confluent.io