Skip to content

Connector Configuration

Bingqin Zhou edited this page Jan 6, 2020 · 9 revisions

Configuring your Connector

topics

A list of Kafka topics to read from

  • Type: List of comma-delimited strings
  • Required (no default)

project

The BigQuery project to write to

  • Type: String
  • Required (no default)

datasets

Names for the datasets Kafka topics will write to (form of <topic regex>=<dataset>)

  • Type: List of comma-delimited strings
  • Required (no default)

gcsFolderName

The name of the folder under the bucket in which gcs blobs used to batch load to BigQuery should be located. Only relevant if enableBatchLoad is configured.

  • Type: String
  • Default: ""

keyfile

The file containing a JSON key with BigQuery service account credentials

  • Type: String
  • Default: ""

schemaRegistryLocation

The base URL of the Schema Registry instance to use

  • Type: String
  • Default: ""
  • Required if and only if autoCreateTables or autoUpdateSchemas is enabled

sanitizeTopics

Whether to automatically sanitize topic names before using them as table names; if not enabled topics names will be used directly as table names

  • Type: boolean
  • Default: false

kafkaDataFieldName

The name of Kafka data (containing kafka topic, offset, and partition information) field in BigQuery. If this config is set to null, Kafka data information will not be included in resulted BigQuery rows. Example: kafkaData

  • Type: String
  • Default: null

kafkaKeyFieldName

The name of Kafka key (containing kafka key information) field in BigQuery. If this config is set to null, Kafka key information will not be included in resulted BigQuery rows. Example: kafkaKey

  • Type: String
  • Default: null

avroDataCacheSize

The size of the cache to use when converting schemas from Avro to Kafka Connect

  • Type: int
  • Default: 100

autoCreateTables

Automatically create BigQuery tables if they don't already exist

  • Type: boolean
  • Default: false

autoUpdateSchemas

Whether or not to automatically update BigQuery schemas

  • Type: boolean
  • Default: false

bufferSize

The maximum number of records to buffer per table before temporarily halting the flow of new records, or -1 for unlimited buffering

  • Type: long
  • Default: 100000

bigQueryRetry

The number of retry attempts that will be made per BigQuery request that fails with a backend error

  • Type: int
  • Default: 0

bigQueryRetryWait

The amount of time, in milliseconds to wait between BigQuery backend error retries

  • Type: long
  • Default: 1,000

topicsToTables

A list of mappings from topic regexes to table names. Note the regex must include capture groups that are referenced in the format string using placeholders (i.e. $1) (form of =).

  • Type: List of comma-delimited strings
  • Default null

batchWriter

The batch writer class to be used. At the moment there are only two options:

  • com.wepay.kafka.connect.bigquery.write.batch.DynamicBatchWriter
  • com.wepay.kafka.connect.bigquery.write.batch.SingleBatchWriter

See these classes for documentation.

  • Type: String
  • Default: com.wepay.kafka.connect.bigquery.write.batch.DynamicBatchWriter

threadPoolSize

The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.

  • Type: Integer
  • Default: 10

queueSize

The maximum size (or -1 for no maximum size) of the worker queue for bigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics will be resumed once a flush is requested or the size of the queue drops under half of the maximum size.

  • Type: Integer
  • Default: -1