# Structured Streaming + Kafka 集成指南

## 从Kafka读取数据

### 创建用于流查询的Kafka源

```python
# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to 1 topic, with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
```

### 创建用于批查询的Kafka源

若用例更适合批处理，可以在定义的偏移范围创建一个DataSet/DataFrame。

```python
# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
```

源的每行有以下数据结构：

<table class="table">
<tr><th>Column</th><th>Type</th></tr>
<tr>
  <td>key</td>
  <td>binary</td>
</tr>
<tr>
  <td>value</td>
  <td>binary</td>
</tr>
<tr>
  <td>topic</td>
  <td>string</td>
</tr>
<tr>
  <td>partition</td>
  <td>int</td>
</tr>
<tr>
  <td>offset</td>
  <td>long</td>
</tr>
<tr>
  <td>timestamp</td>
  <td>timestamp</td>
</tr>
<tr>
  <td>timestampType</td>
  <td>int</td>
</tr>
<tr>
  <td>headers (optional)</td>
  <td>array</td>
</tr>
</table>

对于批查询和流查询，必须为Kafka源设置以下选项：

<table class="table">
  <tr>
    <th>Option</th>
    <th>value</th>
    <th>default</th>
    <th>query type</th>
    <th>meaning</th>
  </tr>
  <tr>
    <td>startingOffsetsByTimestamp</td>
    <td>
      json字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000,
      "1": 2000}} """
    </td>
    <td>none (将应用<code>startingOffsets</code>的值)</td>
    <td>streaming and batch</td>
    <td>
      启动查询时的时间戳记的起始点，一个json字符串，为每个TopicPartition指定一个起始时间戳记。每个分区返回的偏移量是最早的偏移量，其时间戳大于或等于相应分区中的给定时间戳记。
      如果不存在匹配的偏移量，查询将立即失败，以防止意外读取该分区。（到目前为止，这是一种限制，并将在不久的将来解决。）
      <p />
      <p />
      Spark只是将时间戳信息传递给<code>KafkaConsumer.offsetsForTimes </code
      >，而不解释或推断该值。
      <p />
      有关<code>KafkaConsumer.offsetsForTimes</code>更多信息, 请参考
      <a
        href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-"
        >javadoc</a
      >。
      <p />
      另外，此处的<code> timestamp </code
      >的含义可根据Kafka的配置而有所不同(<code>log.message.timestamp.type</code>):
      有关更多信息请参考
      <a href="https://kafka.apache.org/documentation/">Kafka documentation</a
      >。
      <p />
      注意：此选项需要Kafka 0.10.1.0或更高版本。
      <p />
      注2：<code> startingOffsetsByTimestamp </code>优先于<code>
        startingOffsets </code
      >。
      <p />
      注3：对于流式查询，仅在启动新查询时才适用，并且恢复将始终从中断查询的地方开始。
      查询期间新发现的分区将最早开始。
    </td>
  </tr>
  <tr>
    <td>startingOffsets</td>
    <td>
      "earliest", "latest" (仅streaming), 或json字符串"""
      {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
    </td>
    <td>"latest" for streaming, "earliest" for batch</td>
    <td>streaming and batch</td>
    <td>
      查询开始的起点，可以是“earliest”：最早的偏移量，“latest”：最新的偏移量，也可以是为每个TopicPartition指定起始偏移量的json字符串。在json中，可使用-2作为偏移量来指代最早的，-1指代最新的。注意：对于批查询，不允许最新（隐式或在json中使用-1）。对于流查询，这仅在启动新查询时适用，并且恢复将始终从查询中断的地方开始。查询期间新发现的分区将最早开始。
    </td>
  </tr>
  <tr>
    <td>endingOffsetsByTimestamp</td>
    <td>
      json字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000,
      "1": 2000}} """
    </td>
    <td>latest</td>
    <td>batch query</td>
    <td>
      批处理查询结束的终点，一个json字符串，为每个TopicPartition指定结束时间戳。每个分区返回的偏移量是最早的偏移量，其时间戳大于或等于相应分区中的给定时间戳。如果不存在匹配的偏移量，则该偏移量将设置为最新。
      <p />
      <p />
      Spark只是将时间戳信息传递给<code> KafkaConsumer.offsetsForTimes </code
      >，而不解释或推断该值。
      <p />
      有关<code> KafkaConsumer.offsetsForTimes </code>的更多详细信息，请参阅
      <a
        href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-"
        >javadoc</a
      >。
      <p />
      另外，此处的<code> timestamp </code>的含义可以根据Kafka的配置（<code>
        log.message.timestamp.type </code
      >）而有所不同：请参阅
      <a href="https://kafka.apache.org/documentation/">Kafka documentation</a
      >。
      <p />
      注意：此选项需要Kafka 0.10.1.0或更高版本。
      <p />
      注意2：<code> endingOffsetsByTimestamp </code>优先于<code>
        endingOffsets </code
      >。
    </td>
  </tr>
  <tr>
    <td>endingOffsets</td>
    <td>latest or json字符串 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}</td>
    <td>latest</td>
    <td>batch query</td>
    <td>
      批处理查询结束的终点，可以是“latest”（仅指最新），也可以是json字符串，它为每个TopicPartition指定了结束偏移量。
      在json中，可以使用-1作为偏移量来引用最新的值，而-2（earlist）则不能用作偏移量。
    </td>
  </tr>
  <tr>
    <td>failOnDataLoss</td>
    <td>true or false</td>
    <td>true</td>
    <td>streaming and batch</td>
    <td>
      是否有可能在数据丢失（例如，主题已删除或偏移量超出范围）时使查询失败。这可能是错误的警报。当它无法正常工作时，可以将其禁用。
    </td>
  </tr>
  <tr>
    <td>kafkaConsumer.pollTimeoutMs</td>
    <td>long</td>
    <td>512</td>
    <td>streaming and batch</td>
    <td>执行程序中从Kafka轮询数据的超时时间（以毫秒为单位）。</td>
  </tr>
  <tr>
    <td>fetchOffset.numRetries</td>
    <td>int</td>
    <td>3</td>
    <td>streaming and batch</td>
    <td>放弃获取Kafka偏移前重试的次数。</td>
  </tr>
  <tr>
    <td>fetchOffset.retryIntervalMs</td>
    <td>long</td>
    <td>10</td>
    <td>streaming and batch</td>
    <td>重试获取Kafka偏移之前要等待的毫秒数</td>
  </tr>
  <tr>
    <td>maxOffsetsPerTrigger</td>
    <td>long</td>
    <td>none</td>
    <td>streaming and batch</td>
    <td>
      每个触发间隔处理的最大偏移量的速率限制。指定的偏移总数将在不同卷的topicPartitions中按比例分配。
    </td>
  </tr>
  <tr>
    <td>minPartitions</td>
    <td>int</td>
    <td>none</td>
    <td>streaming and batch</td>
    <td>
      需要从Kafka读取的最小分区数。默认情况下，Spark具有1-1的从topicPartitions到Kafka消费的Spark分区的映射。如果将此选项设置为大于topicPartitions的值，Spark会将大的Kafka分区分成较小的部分。请注意，此配置类似于<code>hint</code>：Spark任务的数量将<strong
        >大约</strong
      ><code>为 minPartitions </code
      >。根据取整错误或未接收到任何新数据的Kafka分区，它可能会更少或更多。
    </td>
  </tr>
  <tr>
    <td>groupIdPrefix</td>
    <td>string</td>
    <td>spark-kafka-source</td>
    <td>streaming and batch</td>
    <td>
      结构化流查询生成的消费者组标识符的前缀（<code> group.id </code
      >）。如果设置了“ kafka.group.id”，则该选项将被忽略。
    </td>
  </tr>
  <tr>
    <td>kafka.group.id</td>
    <td>string</td>
    <td>none</td>
    <td>streaming and batch</td>
    <td>
      从Kafka读取时在Kafka消费者中使用的Kafka组ID。请谨慎使用。默认情况下，每个查询都会生成一个唯一的组ID以读取数据。这样可以确保每个Kafka源都有自己的消费者组，不会受到任何其他消费者的干扰，因此可以读取其订阅主题的所有分区。在某些情况下（例如，基于Kafka组的授权），您可能需要使用特定的授权组ID来读取数据。您可以选择设置组ID。但是，请格外小心，因为这可能会导致意外行为。同时运行的查询（批处理和流处理）或具有相同组ID的源可能会相互干扰，导致每个查询仅读取部分数据。快速连续启动/重新启动查询时，也可能会发生这种情况。为了最大程度地减少此类问题，请将Kafka使用者会话超时设置为非常小（通过设置选项“kafka.session.timeout.ms”）。设置此选项后，选项“groupIdPrefix”将被忽略。
    </td>
  </tr>
  <tr>
    <td>includeHeaders</td>
    <td>boolean</td>
    <td>false</td>
    <td>streaming and batch</td>
    <td>是否在行中包含Kafka标头。</td>
  </tr>
</table>





### 消费者缓存

初始化Kafka消费者很耗时，尤其是在处理时间是关键因素的流方案中。因此，Spark通过利用Apache Commons Pool在执行者上汇集了Kafka消费者。

缓存密钥是根据以下信息构建的：
- Topic name
- Topic partition
- Group ID

以下属性可用于配置消费者池：

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
  <td>spark.kafka.consumer.cache.capacity</td>
  <td>64</td>
  <td>The maximum number of consumers cached. Please note that it's a soft limit.</td>
  <td>3.0.0</td>
</tr>
<tr>
  <td>spark.kafka.consumer.cache.timeout</td>
  <td>5m (5 minutes)</td>
  <td>The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
  <td>3.0.0</td>
</tr>
<tr>
  <td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
  <td>1m (1 minute)</td>
  <td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
  <td>3.0.0</td>
</tr>
<tr>
  <td>spark.kafka.consumer.cache.jmx.enable</td>
  <td>false</td>
  <td>Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance.
  The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".
  </td>
  <td>3.0.0</td>
</tr>
</table>

池的大小受限制spark.kafka.consumer.cache.capacity，但它用作“软限制”以不阻止Spark任务。

空闲驱逐线程会定期删除使用时间不超过给定超时的使用者。如果借用时达到此阈值，它将尝试删除当前未使用的最少使用的条目。

如果无法将其删除，则池将保持增长。在最坏的情况下，池将增长到可以在执行程序中运行的最大并发任务数（即任务插槽数）。

如果任务由于任何原因失败，出于安全原因，将使用新创建的Kafka使用者执行新任务。同时，我们使池中具有相同缓存密钥的所有使用者失效，以删除执行失败时使用的使用者。正在使用其他任务的使用者将不会关闭，但是当他们返回到池中时，它们也会失效。

与消费者一起，Spark会分别合并从Kafka获取的记录，以使Kafka消费者在Spark的观点上保持无国籍状态，并最大程度地提高合并效率。它利用与Kafka使用者池相同的缓存密钥。请注意，由于特性差异，它没有利用Apache Commons Pool。

以下属性可用于配置获取的数据池：

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
  <td>spark.kafka.consumer.fetchedData.cache.timeout</td>
  <td>5m (5 minutes)</td>
  <td>The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.</td>
  <td>3.0.0</td>
</tr>
<tr>
  <td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
  <td>1m (1 minute)</td>
  <td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
  <td>3.0.0</td>
</tr>
</table>

## 将数据写入Kafka

在这里，我们描述了向Apache Kafka编写流查询和批处理查询的支持。请注意，Apache Kafka仅支持至少一次写入语义。因此，在向Kafka写入流式查询或批处理查询时，某些记录可能会重复。例如，如果Kafka需要重试经纪人未确认的消息（即使该经纪人接收并写入了消息记录），就会发生这种情况。由于这些Kafka写语义，结构化流无法阻止此类重复发生。但是，如果编写查询成功，则可以假定查询输出至少编写了一次。在读取写入的数据时删除重复项的可能解决方案可能是引入主键（唯一），该键可用于在读取时执行重复数据删除。

写入Kafka的Dataframe在架构中应包含以下几列：

<table class="table">
<tr><th>Column</th><th>Type</th></tr>
<tr>
  <td>key (optional)</td>
  <td>string or binary</td>
</tr>
<tr>
  <td>value (required)</td>
  <td>string or binary</td>
</tr>
<tr>
  <td>headers (optional)</td>
  <td>array</td>
</tr>
<tr>
  <td>topic (*optional)</td>
  <td>string</td>
</tr>
<tr>
  <td>partition (optional)</td>
  <td>int</td>
</tr>
</table>

*如果未指定“ topic”配置选项，则topic列为必填项。

值列是唯一必需的选项。如果未指定null键列，则将自动添加有价键列（请参阅有关如何null处理有价键值的Kafka语义）。如果存在主题列，则在将给定行写入Kafka时将其值用作主题，除非设置了“ topic”配置选项，即，“ topic”配置选项会覆盖主题列。如果未指定“分区”列（或其值为null），则由Kafka生产者计算分区。可以通过设置kafka.partitioner.class选项在Spark中指定Kafka分区程序 。如果不存在，将使用Kafka默认分区程序。

对于批量查询和流查询，必须为Kafka接收器设置以下选项。

<table class="table">
<tr><th>Option</th><th>value</th><th>meaning</th></tr>
<tr>
  <td>kafka.bootstrap.servers</td>
  <td>A comma-separated list of host:port</td>
  <td>The Kafka "bootstrap.servers" configuration.</td>
</tr>
</table>

以下配置是可选的：

<table class="table">
<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
<tr>
  <td>topic</td>
  <td>string</td>
  <td>none</td>
  <td>streaming and batch</td>
  <td>Sets the topic that all rows will be written to in Kafka. This option overrides any
  topic column that may exist in the data.</td>
</tr>
<tr>
  <td>includeHeaders</td>
  <td>boolean</td>
  <td>false</td>
  <td>streaming and batch</td>
  <td>Whether to include the Kafka headers in the row.</td>
</tr>
</table>



### 创建用于流式查询的Kafka接收器

```python
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()
```

### 将批查询的输出写入Kafka

```python
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
```

### 生产者缓存

给定Kafka生产者实例被设计为线程安全的，Spark初始化一个Kafka生产者实例，并在任务中共同使用同一缓存密钥。

缓存密钥是根据以下信息构建的：

- Kafka生产者配置

这包括授权配置，当使用委派令牌时，Spark将自动包括该配置。即使我们考虑了授权，也可以期望在相同的Kafka生产者配置中使用相同的Kafka生产者实例。委托令牌更新时，它将使用不同的Kafka生产者；旧的授权令牌的Kafka生产者实例将根据缓存策略被驱逐。

以下属性可用于配置生产者池：

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
  <td>spark.kafka.producer.cache.timeout</td>
  <td>10m (10 minutes)</td>
  <td>The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
  <td>2.2.1</td>
</tr>
<tr>
  <td>spark.kafka.producer.cache.evictorThreadRunInterval</td>
  <td>1m (1 minute)</td>
  <td>The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.</td>
  <td>3.0.0</td>
</tr>
</table>

空闲驱逐线程会定期删除使用时间不超过给定超时的生产者。请注意，生产者是共享和并发使用的，因此最后一次使用的时间戳由返回生产者实例且引用计数为0的时刻确定。

## Kafka特定配置

Kafka自己的配置可以通过设置DataStreamReader.option与kafka.前缀，例如 stream.option("kafka.bootstrap.servers", "host:port")。有关可能的kafka参数，请参阅 Kafka使用者配置文档以获取与读取数据相关的参数，以及Kafka生产者配置文档 以获取与写入数据相关的参数。

请注意，无法设置以下Kafka参数，并且Kafka源或接收器将引发异常：

- **group.id**：Kafka源将自动为每个查询创建一个唯一的组ID。用户可以通过可选的source选项设置自动生成的group.id的前缀groupIdPrefix，默认值为“ spark-kafka-source”。您也可以设置“ kafka.group.id”以强制Spark使用特殊的组ID，但是，请阅读此选项的警告并谨慎使用。
- **auto.offset.reset**：设置源选项startingOffsets以指定从何处开始。结构化流管理在内部管理哪些偏移量，而不是依靠kafka使用者来执行此操作。这将确保在动态订阅新主题/分区时不会丢失任何数据。请注意，startingOffsets仅在启动新的流查询时适用，并且恢复将始终从查询中断的地方开始。
- **key.deserializer**：始终使用ByteArrayDeserializer将键反序列化为字节数组。使用DataFrame操作显式反序列化键。
- **value.deserializer**：始终使用ByteArrayDeserializer将值反序列化为字节数组。使用DataFrame操作显式反序列化值。
- **key.serializer**：密钥始终使用ByteArraySerializer或StringSerializer进行序列化。使用DataFrame操作可以将密钥显式序列化为字符串或字节数组。
- **value.serializer**：值始终使用ByteArraySerializer或StringSerializer进行序列化。使用DataFrame操作可以将值显式序列化为字符串或字节数组。
- **enable.auto.commit**：Kafka源不提交任何偏移量。
- **Interceptor.classes**：Kafka源始终将键和值读取为字节数组。使用ConsumerInterceptor是不安全的，因为它可能会中断查询。

## 部署

与任何Spark应用程序一样，spark-submit用于启动您的应用程序。spark-sql-kafka-0-10_2.12 其依赖项可以直接添加到spark-submit使用中--packages，例如，

```bash
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...
```

为了进行实验spark-shell，您还可以直接使用--packagesaddspark-sql-kafka-0-10_2.12及其依赖项

```bash
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 ...
```

<h2 id="security">Security</h2>

<p>Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed
description about these possibilities, see <a href="http://kafka.apache.org/documentation.html#security">Kafka security docs</a>.</p>

<p>It&#8217;s worth noting that security is optional and turned off by default.</p>

<p>Spark supports the following ways to authenticate against Kafka cluster:</p>
<ul>
  <li><strong>Delegation token (introduced in Kafka broker 1.1.0)</strong></li>
  <li><strong>JAAS login configuration</strong></li>
</ul>

<h3 id="delegation-token">Delegation token</h3>

<p>This way the application can be configured via Spark parameters and may not need JAAS login
configuration (Spark can use Kafka&#8217;s dynamic JAAS configuration feature). For further information
about delegation tokens, see <a href="http://kafka.apache.org/documentation/#security_delegation_token">Kafka delegation token docs</a>.</p>

<p>The process is initiated by Spark&#8217;s Kafka delegation token provider. When <code class="highlighter-rouge">spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code> is set,
Spark considers the following log in options, in order of preference:</p>
<ul>
  <li><strong>JAAS login configuration</strong>, please see example below.</li>
  <li>
    <p><strong>Keytab file</strong>, such as,</p>

    <div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-submit \
    --keytab &lt;KEYTAB_FILE&gt; \
    --principal &lt;PRINCIPAL&gt; \
    --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=&lt;KAFKA_SERVERS&gt; \
    ...
</code></pre></div>    </div>
  </li>
  <li>
    <p><strong>Kerberos credential cache</strong>, such as,</p>

    <div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-submit \
    --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=&lt;KAFKA_SERVERS&gt; \
    ...
</code></pre></div>    </div>
  </li>
</ul>

<p>The Kafka delegation token provider can be turned off by setting <code class="highlighter-rouge">spark.security.credentials.kafka.enabled</code> to <code class="highlighter-rouge">false</code> (default: <code class="highlighter-rouge">true</code>).</p>

<p>Spark can be configured to use the following authentication protocols to obtain token (it must match with
Kafka broker configuration):</p>
<ul>
  <li><strong>SASL SSL (default)</strong></li>
  <li><strong>SSL</strong></li>
  <li><strong>SASL PLAINTEXT (for testing)</strong></li>
</ul>

<p>After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
Delegation token uses <code class="highlighter-rouge">SCRAM</code> login module for authentication and because of that the appropriate
<code class="highlighter-rouge">spark.kafka.clusters.${cluster}.sasl.token.mechanism</code> (default: <code class="highlighter-rouge">SCRAM-SHA-512</code>) has to be configured. Also, this parameter
must match with Kafka broker configuration.</p>

<p>When delegation token is available on an executor Spark considers the following log in options, in order of preference:</p>
<ul>
  <li><strong>JAAS login configuration</strong>, please see example below.</li>
  <li><strong>Delegation token</strong>, please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code> parameter for further details.</li>
</ul>

<p>When none of the above applies then unsecure connection assumed.</p>

<h4 id="configuration">Configuration</h4>

<p>Delegation tokens can be obtained from multiple clusters and <code>${cluster}</code> is an arbitrary unique identifier which helps to group different configurations.</p>

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code></td>
    <td>None</td>
    <td>
      A list of coma separated host/port pairs to use for establishing the initial connection
      to the Kafka cluster. For further details please see Kafka documentation. Only used to obtain delegation token.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code></td>
    <td>.*</td>
    <td>
      Regular expression to match against the <code>bootstrap.servers</code> config for sources and sinks in the application.
      If a server address matches this regex, the delegation token obtained from the respective bootstrap servers will be used when connecting.
      If multiple clusters match the address, an exception will be thrown and the query won't be started.
      Kafka's secure and unsecure listeners are bound to different ports. When both used the secure listener port has to be part of the regular expression.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td>
    <td>SASL_SSL</td>
    <td>
      Protocol used to communicate with brokers. For further details please see Kafka documentation. Protocol is applied on all the sources and sinks as default where
      <code>bootstrap.servers</code> config matches (for further details please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code>),
      and can be overridden by setting <code>kafka.security.protocol</code> on the source or sink.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.sasl.kerberos.service.name</code></td>
    <td>kafka</td>
    <td>
      The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.
      For further details please see Kafka documentation. Only used to obtain delegation token.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td>
    <td>None</td>
    <td>
      The location of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.password</code></td>
    <td>None</td>
    <td>
      The store password for the trust store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code> is configured.
      For further details please see Kafka documentation. Only used to obtain delegation token.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td>
    <td>None</td>
    <td>
      The location of the key store file. This is optional for client and can be used for two-way authentication for client.
      For further details please see Kafka documentation. Only used to obtain delegation token.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.password</code></td>
    <td>None</td>
    <td>
      The store password for the key store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code> is configured.
      For further details please see Kafka documentation. Only used to obtain delegation token.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.ssl.key.password</code></td>
    <td>None</td>
    <td>
      The password of the private key in the key store file. This is optional for client.
      For further details please see Kafka documentation. Only used to obtain delegation token.
    </td>
    <td>3.0.0</td>
  </tr>
  <tr>
    <td><code>spark.kafka.clusters.${cluster}.sasl.token.mechanism</code></td>
    <td>SCRAM-SHA-512</td>
    <td>
      SASL mechanism used for client connections with delegation token. Because SCRAM login module used for authentication a compatible mechanism has to be set here.
      For further details please see Kafka documentation (<code>sasl.mechanism</code>). Only used to authenticate against Kafka broker with delegation token.
    </td>
    <td>3.0.0</td>
  </tr>
</table>

<h4 id="kafka-specific-configurations-1">Kafka Specific Configurations</h4>

<p>Kafka&#8217;s own configurations can be set with <code class="highlighter-rouge">kafka.</code> prefix, e.g, <code class="highlighter-rouge">--conf spark.kafka.clusters.${cluster}.kafka.retries=1</code>.
For possible Kafka parameters, see <a href="http://kafka.apache.org/documentation.html#adminclientconfigs">Kafka adminclient config docs</a>.</p>

<h4 id="caveats">Caveats</h4>

<ul>
  <li>Obtaining delegation token for proxy user is not yet supported (<a href="https://issues.apache.org/jira/browse/KAFKA-6945">KAFKA-6945</a>).</li>
</ul>

<h3 id="jaas-login-configuration">JAAS login configuration</h3>

<p>JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster.
This provides the possibility to apply any custom authentication logic with a higher cost to maintain.
This can be done several ways. One possibility is to provide additional JVM parameters, such as,</p>

<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-submit \
    --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
    ...
</code></pre></div></div>
    
