Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-7246: add ability to partition based on timestamp of a record value field #246

Merged
merged 5 commits into from
Feb 27, 2020

Conversation

dosvath
Copy link
Contributor

@dosvath dosvath commented Feb 11, 2020

Replaces #214. Implements additional validation for the configs to either use decorators or field name for partitioning. Adds ability to partition BQ tables based on a field name that contains timestamp information.

Performed Manual Integration Tests

Tokens PROJECT_ID, DATASET_ID, TABLE_ID are used as substitutes in this example for the actual IDs.

  1. Start connector on local confluent cluster with the configurations:
name=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1

topics=TABLE_ID
sanitizeTopics=true

autoUpdateSchemas=true
bigQueryPartitionDecorator=false
timestampPartitionFieldName=f2

schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081

bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000

########################################### Fill me in! ###########################################
# The name of the BigQuery project to write to
project=PROJECT_ID
# The name of the BigQuery dataset to write to (leave the '.*=' at the beginning, enter your
# dataset after it)
datasets=.*=DATASET_ID
# The location of a BigQuery service account JSON key file
keyfile=FILEPATH
  1. Send a few records using the avro producer, with various different dates, for example:
echo '{"f1":"Testing the Kafka-BigQuery partitioning!", "f2": 1580513872000}' | ./kafka-avro-console-producer --broker-list localhost:9092 --topic TABLE_ID --property value.schema='{"type": "record", "name": "myrecord", "fields": [{ "name": "f1","type": "string"},{"name": "f2","type": {"connect.name": "org.apache.kafka.connect.data.Timestamp","connect.version": 1,"logicalType": "timestamp-millis","type":"long"}}]}'
  1. Using the BigQuery console, run the following query to verify the existing partitions:
SELECT f2 as pt, FORMAT_TIMESTAMP("%Y%m%d", f2) as partition_id
FROM `PROJECT_ID.DATASET_ID.TABLE_ID`
GROUP BY f2
ORDER BY f2

The output will list all the rows that have different dates, with an additional column of partition_id. Records that have a timestamp within a day's range will have the same partition_id as expected.

@claassistantio
Copy link

claassistantio commented Feb 11, 2020

CLA assistant check
All committers have signed the CLA.

Copy link

@levzem levzem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@wicknicks wicknicks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit. thanks for taking this up, @dosvath!

…config/BigQuerySinkTaskConfigTest.java

Co-Authored-By: Arjun Satish <wicknicks@users.noreply.github.com>
Copy link
Contributor

@wicknicks wicknicks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks, @dosvath!

@wicknicks
Copy link
Contributor

@mtagle @bingqinzhou can you please take a look at this PR? thanks in advance!

@criccomini
Copy link
Contributor

@mtagle @whynick1 PTAL

Copy link
Contributor

@whynick1 whynick1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, a solid PR 👍
Just one concern,timestampPartitionFieldName takes a string. What if a we have different topic with different partition field?

@dosvath
Copy link
Contributor Author

dosvath commented Feb 27, 2020

In general, a solid PR 👍
Just one concern,timestampPartitionFieldName takes a string. What if a we have different topic with different partition field?

For this PR only one partition field name is supported, which means if you'd like a second topic that has a different partition field name, you would have to start another connector. Alternatively, you could create the table for the second topic manually, and specify the field, then use the connector with the new topic added to the topics configuration. (If the table you're trying to write to already exists before starting the connector, and the schema doesn't need updating, the single timestampPartitionFieldName string config will not be used)

In the future, functionality could be added to support a list of partitionFieldName's that correspond to the number and order of the topics given to the connector.

@skyzyx
Copy link
Contributor

skyzyx commented Feb 27, 2020

Codecov Report

Merging #246 into master will increase coverage by 1.88%.
The diff coverage is n/a.

@@             Coverage Diff              @@
##             master     #246      +/-   ##
============================================
+ Coverage     69.05%   70.94%   +1.88%     
- Complexity      277      289      +12     
============================================
  Files            32       32              
  Lines          1477     1504      +27     
  Branches        155      156       +1     
============================================
+ Hits           1020     1067      +47     
+ Misses          409      383      -26     
- Partials         48       54       +6     
Impacted Files Coverage Δ Complexity Δ
...afka/connect/bigquery/write/row/GCSToBQWriter.java 80.00% <0.00%> (-6.80%) 13.00% <0.00%> (ø%)
...wepay/kafka/connect/bigquery/BigQuerySinkTask.java 60.31% <0.00%> (-0.94%) 26.00% <0.00%> (-3.00%)
...onnect/bigquery/config/BigQuerySinkTaskConfig.java 94.64% <0.00%> (-1.11%) 8.00% <0.00%> (+3.00%)
...nect/bigquery/write/batch/GCSBatchTableWriter.java 86.66% <0.00%> (+0.95%) 3.00% <0.00%> (ø%) ⬆️
...om/wepay/kafka/connect/bigquery/SchemaManager.java 73.33% <0.00%> (+7.47%) 7.00% <0.00%> (+3.00%) ⬇️
...fka/connect/bigquery/write/row/BigQueryWriter.java 94.73% <0.00%> (+5.26%) 21.00% <0.00%> (+2.00%) ⬇️
...afka/connect/bigquery/write/batch/TableWriter.java 66.66% <0.00%> (+5.55%) 5.00% <0.00%> (+1.00%) ⬇️
...ect/bigquery/write/row/AdaptiveBigQueryWriter.java 40.62% <0.00%> (+40.62%) 6.00% <0.00%> (+6.00%) ⬇️

@levzem
Copy link

levzem commented Feb 27, 2020

In general, a solid PR 👍
Just one concern,timestampPartitionFieldName takes a string. What if a we have different topic with different partition field?

Another possibility to @dosvath reply is to just add a SMT to rename the field to timestampPartitionFieldName and not have to worry about adding extended functionality.

@criccomini
Copy link
Contributor

@whynick1 Agree it's somewhat annoying to only support one name. Long-term this issue will go away with table router in #245, so I'm OK with merging this

@criccomini
Copy link
Contributor

@dosvath Do you need a maven central publication as well?

@criccomini criccomini merged commit 02f7688 into wepay:master Feb 27, 2020
@NathanNam
Copy link

@criccomini Could you release a new version with this improvement? We'd like to unblock some customers.

@apoorvmittal10
Copy link
Contributor

@NathanNam @criccomini Shall we also make a fix for below mentioned issue before releasing?
https://github.com/wepay/kafka-connect-bigquery/pull/248/files#r386433746

@criccomini
Copy link
Contributor

@apoorvmittal10 @NathanNam waiting for your guidance on whether release is required now, or you want to wait to fix the issue @apoorvmittal10 linked to.

@apoorvmittal10
Copy link
Contributor

@criccomini I shall say we shall go ahead with #246 while reverting #248 PR.

@criccomini
Copy link
Contributor

We can't revert 248--we are seeing problems with throttling right now without that patch. I think we should fix forward.

@wicknicks
Copy link
Contributor

fix forward makes sense. looks like @apoorvmittal10 made a PR here for the fix: #257. if that is all we need, can we prioritize reviewing it?

@criccomini
Copy link
Contributor

Yep-- Just pinged @bingqinzhou

@bingqinzhou
Copy link
Contributor

Merged #257 and released it as 1.6.1, waiting for the new version to show up on maven central.

@NathanNam
Copy link

Thank you all!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants