Skip to content

Kafka Connectors Shared Logic

Vishwajith Shivappa edited this page Feb 21, 2020 · 24 revisions

Contents

Configuration

The configuration properties below are shared across all Kafka connectors in Brooklin.

Remember:

Property Description Default

defaultKeySerde

  • Name of SerDe (Serializer/Deserializer) to use with Kafka topic key

  • Name has to match one of the values specified in brooklin.server.serdeNames

(None)

defaultValueSerde

  • Name of SerDe (Serializer/Deserializer) to use with Kafka topic value

  • Name has to match one of the values specified in brooklin.server.serdeNames

(None)

isGroupIdHashingEnabled

  • A flag indicating whether Kafka consumer group ID should be hashed

  • If true, the consumer group ID for a datastream is set to <clusterName>.<groupIdMD5Hash>, where:

  • If false, the consumer group ID for a datastream is set to <sourceConnectionString>-to-<destConnectionString>, where:

false

commitIntervalMs

  • The time duration (in milliseconds) between successive offset commits to Kafka

  • Must be in the range [0, Long.MAX_VALUE] (inclusive)

60000

(=1 min)

commitTimeout

  • The timeout (in milliseconds) to spend waiting in calls to commitSync on the KafkaConsumer

  • Must be in the range [0, Long.MAX_VALUE] (inclusive)

30000

(=30 sec)

pollTimeoutMs

  • The timeout (in milliseconds) to spend waiting in poll calls to Kafka if no data is available

  • Specifying a value of 0 causes the KafkaConsumer to return immediately if no data is available

  • Must be in the range [0, Long.MAX_VALLUE] (inclusive)

30000

(=30 sec)

retryCount

The maximum number of poll attempts to Kafka in case of failure

5

retrySleepDurationMs

The time duration (in milliseconds) to wait between successive poll attempts to Kafka in case of failure

5000

(=5 sec)

pausePartitionOnError

A flag indicating whether to auto-pause a topic partition if dispatching its data for delivery to the destination system fails

false

pauseErrorPartitionDurationMs

The time duration (in milliseconds) to keep a topic partition paused after encountering send errors, before attempting to auto-resume

600000

(=10 min)

daemonThreadIntervalInSeconds

  • The time duration between successive attempts to restart unhealthy DatastreamTasks

  • Also used as the initial delay before checking the DatastreamTasks health status for the first time

300 sec

nonGoodStateThresholdMs

  • The maximum time duration (in milliseconds) to allow between successive polls from Kafka, before a DatastreamTask is deemed unhealthy

  • Must be in the range [60000, Long.MAX_VALUE] (inclusive)

600000

(=10 min)

processingDelayLogThreshold

The maximum time duration (in milliseconds) to allow between consuming data from Kafka and dispatching it for delivery to destination, before incrementing numProcessingOverThreshold

60000

(=1 min)

consumerFactoryClassName

KafkaConsumerFactoryImpl

consumer.*

Kafka consumer configuration properties

(None)

Diagnostics

The diagnostic endpoints below are shared across all Kafka connectors in Brooklin.

URL

GET /diag?q=:query&type=:component&scope=:scope&content=:componentQuery

URL Params

Required

  • query

    • Possible values are: status or allStatus

    • status retrieves data for a single Brooklin instance

    • allStatus retrieves aggregated data for all Brooklin instances in the cluster

  • type: only supported value is connector

  • scope: Name of the connector to query, as specified in brooklin.server.connectorNames

  • content

    • Represents a subquery to the component (connector) in question

    • Possible values are: datastream_state and position

      Subquery

      datastream_state

      Subquery Params

      Required

      datastreamName: Name of datastream to query

      Example

      datastream_state?datastream=:datastreamName

      Constraints

      • Datastream must exist

      Response

      Content-Type

      application/json

      Schema
      {
        "elements": [
        ],
        "paging": {
          "count": "[int]",
          "start": "[int]",
          "links": "[array]"
        }
      }

      Subquery

      partitions

      Response

      Content-Type

      application/json

      Schema
      {
        "elements": [
        ],
        "paging": {
          "count": "[int]",
          "start": "[int]",
          "links": "[array]"
        }
      }
      • If query was set to status or allStatus: each ServerComponentHealth object will contain a JSON-serialized string in its status field. This string is the serialization of a list of KafkaTopicPartitionStatsResponse

      [
        {
          "consumerGroupId": {},
          "topicPartitions": {
             "topicName": "[array of partitions]",
          },
          "datastreams": [],
        },
      ]

Metrics

The metrics below are shared across all Kafka connectors in Brooklin.

General Metrics

General metrics prefix: <connectorName>.

Metric Name Description

numDatastreams

The number of datastreams using the connector in the entire cluster

numDatastreamTasks

The number of datastream tasks that belong to datastreams using the connector in the entire cluster

Aggregate Metrics

  • Aggregate metrics cover all datastreams in a single Brooklin instance.

  • Aggregate metrics prefix: <connectorName>.<connectorTask>.aggregate.

Metric Name Description

clientPollOverTimeout

The number of times polling Kafka consumer exceeds pollTimeoutMs in calls to KafkaConsumer::poll(long), by more than 1 sec

errorRate

The rate of errors encountered when data is dispatched for delivery to the destination system

eventsByteProcessedRate

The rate of bytes processed and dispatched for delivery to destination

eventsProcessedRate

The rate of Kafka records processed and dispatched for delivery to destination

numAutoPausedPartitionsAwaitingDestTopic

The number of auto-paused topic partitions awaiting destination topic creation

numAutoPausedPartitionsOnError

The number of auto-paused topic partitions due to errors encountered during dispatch for delivery

numAutoPausedPartitionsOnInFlightMessages

The number of auto-paused topic partitions due to exceeding their maximum in-flight messages thresholds

numConfigPausedPartitions

The number of topic partitions paused manually

numPartitions

The number of Kafka topic partitions

numProcessingOverThreshold

The number of times dispatching records to destination exceeds processingDelayLogThreshold

numTopics

The number of Kafka topics

pollIntervalOverSessionTimeout

The number of polls exceeding the configured maximum session timeout for KafkaConsumer

rebalanceRate

The rate of rebalances seen by the Kafka consumer

stuckPartitions

The number of stuck topic partitions

Datastream-Specific Metrics

  • Datastream-specific metrics prefix: <connectorName>.<connectorTask>.<datastreamName>.

Metric Name Description

errorRate

The rate of errors encountered when data is dispatched for delivery to the destination system

eventCountsPerPoll

The distribution (histogram) of the number of records retrieved from Kafka in every poll

eventsByteProcessedRate

The rate of bytes processed and dispatched for delivery to destination

eventsProcessedRate

The rate of Kafka records processed and dispatched for delivery to destination

numAutoPausedPartitionsAwaitingDestTopic

The number of auto-paused topic partitions awaiting destination topic creation

numAutoPausedPartitionsOnError

The number of auto-paused topic partitions due to errors encountered during dispatch for delivery

numAutoPausedPartitionsOnInFlightMessages

The number of auto-paused topic partitions due to exceeding their maximum in-flight messages thresholds

numConfigPausedPartitions

The number of topic partitions paused manually

numPartitions

The number of Kafka topic partitions

numPolls

The rate of polls performed using the Kafka consumer

numProcessingOverThreshold

The number of times dispatching records to destination exceeds processingDelayLogThreshold

numTopics

The number of Kafka topics

rebalanceRate

The rate of rebalances seen by the Kafka consumer

stuckPartitions

The number of stuck topic partitions

timeSinceLastEventReceivedMs

The time duration (in milliseconds) since the last non-empty ConsumerRecord was fetched using the Kafka consumer

REST API Data Models

KafkaDatastreamStatesResponse

Field Name Type Description

datastream

string

Datastream name

assignedTopicPartitions

array

Assigned topic partitions

autoPausedPartitions

map

Associates each auto-paused topic partition with metadata about the paused partitions

manualPausedPartitions

map

Associates each topic with a list of manually paused partitions

inFlightMessageCounts

map

Associates each topic partition with the number of in-flight messages

KafkaTopicPartitionStatsResponse

Field Name Type Description

consumerGroupId

string

Kafka consumer group identifier

topicPartitions

map

Associates each topic with a set of assigned partitions

datastreams

set

Names of the Datastreams assigned to the Kafka consumer group