In [1]:
import os 
packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import to_json, struct, lit
sc = SparkContext('local')
spark = SparkSession(sc)

In [2]:
!pip install kafka-python



# Đọc dữ liệu từ Kafka

## 1. Tạo Kafka Source bằng truy vấn tập hợp (Batch queries)

Cụ thể, trong các trường hợp cần truy vấn tập hợp, bạn có thể tạo một Dataset/DataFrame để xác định khoảng offset phù hợp.

### Subcribe mặc định một topic  tại offset bắt đầu và offset kết thúc:
Khi không chỉ định offset cụ thể, sẽ mặc định subcribe topic tại offset bắt đầu và offset kết thúc.

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()\
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
df = spark\
.read\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "default_topic")\
.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(2, False)

+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key|value                                                                                                                                                                                                                                                                                                                                 |
+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|

Định dạng dữ liệu trong schema:

| Cột | Kiểu dữ liệu |
| :--- | :--- |
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | timestamp |
| timestampType | int |
| headers (optional) | array |


In [4]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Subcribe tới nhiều topic với offset được chỉ định cụ thể:

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()\
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [5]:
df = spark\
.read\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "default_topic,default_topic")\
.option("startingOffsets", """{"default_topic":{"0":23,"1":-2},"default_topic":{"0":23}}""")\
.option("endingOffsets", """{"default_topic":{"0":50,"1":-1},"default_topic":{"0":50}}""")\
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(2, False)

+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key|value                                                                                                                                                                                                                                                                                                                       |
+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|23 |{"_c0": 23, "age": 43, "wo

In [6]:
df.selectExpr("offset").show(40, False)

+------+
|offset|
+------+
|23    |
|24    |
|25    |
|26    |
|27    |
|28    |
|29    |
|30    |
|31    |
|32    |
|33    |
|34    |
|35    |
|36    |
|37    |
|38    |
|39    |
|40    |
|41    |
|42    |
|43    |
|44    |
|45    |
|46    |
|47    |
|48    |
|49    |
+------+



### Subscribe một pattern tại offset bắt đầu và offset kết thúc:

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() \
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [6]:
df = spark\
.read\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribePattern", "topic.*")\
.option("startingOffsets", "earliest")\
.option("endingOffsets", "latest")\
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(2, False)

+---+-----+
|key|value|
+---+-----+
+---+-----+



## 2. Tạo Kafka Source bằng truy vấn luồng (Streaming queries)

### Subcribe một topic:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()\
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [2]:
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "default_topic")\
.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

DataFrame[key: string, value: string]

In [3]:
df.writeStream\
    .queryName("activity").format("memory")\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7fdee60074c0>

In [4]:
from time import sleep
for x in range(5):
    spark.sql("SELECT count(*) FROM activity").show()
    sleep(1)

+--------+
|count(1)|
+--------+
|       0|
+--------+

+--------+
|count(1)|
+--------+
|    4751|
+--------+

+--------+
|count(1)|
+--------+
|    4751|
+--------+

+--------+
|count(1)|
+--------+
|    4751|
+--------+

+--------+
|count(1)|
+--------+
|  179916|
+--------+



### Subcribe một topic có header:

val df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()\
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

In [None]:
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "default_topic")\
.option("includeHeaders", "true") \
.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

### Subcribe nhiều topic:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()\
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [5]:
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribe", "default_topic, default_topic")\
.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

DataFrame[key: string, value: string]

### Subscribe một pattern:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()\
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [6]:
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafka:9092")\
.option("subscribePattern", "topic.*")\
.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

DataFrame[key: string, value: string]

Các option cần thiết lập cho Kafka Source trong cả streaming queries và batch queries:

| Option | Giá trị | Ý nghĩa |
| :--- | :--- | :--- |
| assign | json string {"topicA":[0,1],"topicB":[2,4]} | Chỉ định TopicPartition dùng để subcribe một topic (Do topic có thể có nhiều partition). Các pattern được sử dụng để subcribe một hoặc nhiều topic. Chỉ một trong số các option "asign", "subcribe" hoặc "subcribePattern" được chỉ định khi subcribe một topic trong Kafka source. |
| subscribe | Các topic được ngăn cách bởi các dấu phẩy | Các pattern được sử dụng để subcribe một hoặc nhiều topic. Chỉ một trong số các option "asign", "subcribe" hoặc "subcribePattern" được chỉ định khi subcribe một topic trong Kafka source. |
| subscribePattern | Chuỗi biểu thức chính quy Java | Các pattern được sử dụng để subcribe một hoặc nhiều topic. Chỉ một trong số các option "asign", "subcribe" hoặc "subcribePattern" được chỉ định khi subcribe một topic trong Kafka source. |
| kafka.bootstrap.servers | Các host:port được ngăn cách bởi các dấu phẩy | Cấu hình Kafka server |

Những thông số cấu hình sau là không bắt buộc:

| Option | Giá trị | Mặc định | Kiểu query | Ý nghĩa |
| :--- | :--- | :--- | :--- | :--- |
| startingOffsetsByTimestamp | json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | none (mặc định giá trị của startingOffsets được sử dụng) | streaming query, batch query | Timestamp ghi nhận mốc bắt đầu khi câu truy vấn được thực hiện, mỗi TopicPartition được gán một chuỗi json biểu diễn timestamp bắt đầu. Giá trị offset trả về của mỗi partition là giá trị offset đầu tiên có timestamp lớn hơn hoặc bằng timestamp của partition tương ứng. Nếu không tồn tại offset nào phù hợp, câu truy vấn sẽ dừng ngay lập tức để ngăn chặn việc đọc dữ liệu không chủ đích từ những partition không có offset thỏa mãn. (Hiện tại thì option này còn tồn tại nhiều hạn chế, tuy nhiên sẽ được giải quyết trong tương lai gần). <br>Sau đó, Spark chỉ đơn giản truyền thông tin về timestamp tới KafkaConsumer.offsetsForTimes, không chú thích gì thêm về giá trị gửi đi. <br>Nếu cần thêm thông tin chi tiết về KafkaConsumer.offsetsForTimes, tham khảo thêm [javadoc](https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-). <br>Ý nghĩa của timestamp cũng tùy thuộc vào cấu hình của Kafka (log.message.timestamp.type): tham khảo thêm [Kafka documentation](https://kafka.apache.org/documentation/) để biết thêm chi tiết. <br>Lưu ý: Option này yêu cầu Kafka 0.10.1.0 hoặc cao hơn. <br>Lưu ý 2: startingOffsetsByTimestamp có mức ưu tiên cao hơn startingOffsets.<br>Lưu ý 3: Đối với truy vấn luồng, option này chỉ áp dụng khi câu lệnh truy vấn hoạt động, hoặc bắt đầu lại từ chỗ truy vấn dừng. Một partition mới thêm vào lúc truy vấn sẽ bắt đầu với offset đầu tiên. |
| startingOffsets | "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | "latest" với truy vấn luồng, "earliest" cho truy vấn tập hợp | streaming query, batch query | Bắt đầu khi câu truy vấn bắt đầu, hoặc dùng "earliest" của offset bắt đầu, hoặc "lastest" của offset kết thúc, hoặc dùng json chỉ định offset bắt đầu của từng TopicPartition. Trong json, -2 chỉ earliest, -1 chỉ lastest. Lưu ý: Truy vấn tập hợp không cho phép dùng lastest ( tương ứng -1 trong json). Đối với truy vấn luồng, chỉ áp dụng khi truy vấn bắt đầu, và bắt đầu lại từ chỗ truy vấn dừng. Một partition mới thêm vào lúc truy vấn sẽ bắt đầu với offset đầu tiên. |
| endingOffsetsByTimestamp | json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | latest | batch query | Timestamp ghi nhận mốc bắt đầu khi câu truy vấn tập hợp được thực hiện, mỗi TopicPartition được gán một chuỗi json biểu diễn timestamp bắt đầu. Giá trị offset trả về của mỗi partition là giá trị offset đầu tiên có timestamp lớn hơn hoặc bằng timestamp của partition tương ứng. Nếu không tồn tại offset nào phù hợp, offset được set là lastest.<br>Sau đó, Spark chỉ đơn giản truyền thông tin về timestamp tới KafkaConsumer.offsetsForTimes, không chú thích gì thêm về giá trị gửi đi. <br>Nếu cần thêm thông tin chi tiết về KafkaConsumer.offsetsForTimes, tham khảo thêm [javadoc](https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-). <br>Ý nghĩa của timestamp cũng tùy thuộc vào cấu hình của Kafka (log.message.timestamp.type): tham khảo thêm [Kafka documentation](https://kafka.apache.org/documentation/) để biết thêm chi tiết. <br>Lưu ý: Option này yêu cầu Kafka 0.10.1.0 hoặc cao hơn. <br>Lưu ý 2: endingOffsetsByTimestamp có mức ưu tiên cao hơn endingOffsets. |
| endingOffsets | latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | batch query | Kết thúc khi câu truy vấn tập hợp kết thúc, dùng "lastest" của offset kết thúc, hoặc dùng json để chỉ định offset kết thúc của từng TopicPartition. Trong json, -1 chỉ lastest, -2 (tương ứng với earliest) không được dùng. |
| failOnDataLoss | true hoặc false | true | streaming query, batch query | Liệu có xảy ra mất mát dữ liệu khi câu truy vấn không thực hiện thành công (Ví dụ:  topic bị xóa, hoặc khoảng offset không hợp lệ). Nhưng option này cũng có thể hoạt động sai lệch. Bạn có thể không dùng option này nếu nó hoạt động không như mong đợi. |
| kafkaConsumer.pollTimeoutMs | long | 512 | streaming query, batch query | Timeout tính theo milisecond khi thực hiện thăm dò dữ liệu từ Kafka. | 
| fetchOffset.numRetries | int | 3 | streaming query, batch query | Số lần thử lại trước khi dừng việc lấy ra Kafka offset. | 
| fetchOffset.retryIntervalMs | long | 10 | streaming query, batch query | Thời gian đợi tính theo milisecond trước khi thử lại việc lấy ra Kafka offset. | 
| maxOffsetsPerTrigger | long | none | streaming query, batch query | Giới hạn số lượng tối đa các offset được xử lý mỗi lần hoạt động. Một số lượng xác định các offset được phân chia theo tỉ lệ giữa các topicPartition có khối lượng khác nhau. | 
| minPartitions | int | none | streaming query, batch query | Số lượng nhỏ nhất partition mong muốn khi đọc dữ liệu từ Kafka. Spark có ánh xạ 1 - 1 từ topicPartition tới Spark partition khi subcribe từ Kafka. Nếu bạn chọn một giá trị lớn hơn số topicPartition hiện có, Spark sẽ chia những partition lớn thành những phần nhỏ hơn. Lưu ý rằng thiết lập này có nghĩa là số tác vụ của Spark sẽ xấp xỉ số minPartitions. Nhưng có thể ít hoặc nhiều hơn tùy thuộc vào các lỗi có thể gặp phải hoặc Kafka partition không nhận được bất kỳ dữ liệu mới nào. |
| groupIdPrefix | string | spark-kafka-source | streaming query, batch query | Tiền tố của định danh nhóm consumer (group.id) được tạo ra khi thực hiện truy vấn luồng có cấu trúc. Nếu "kafka.group.id" được dùng, thì option này sẽ bị bỏ qua. |
| kafka.group.id | string | none | streaming query, batch query | Kafka group id được dùng trong Kafka consumer khi đọc dữ liệu từ Kafka. Cẩn trọng khi dùng option này. Mặc định, mỗi truy vấn sẽ tạo ra một group id riêng biệt để đọc dữ liệu. Điều này đảm bảo rằng mỗi Kafka source có một consumer group riêng và không bị ảnh hưởng bởi các consumer khác, từ đó có thể đọc được tất cả các partition của topic mà nó subcribe. Trong một vài trường hợp (ví dụ: xác thực Kafka theo nhóm), bạn có thể muốn dùng một nhóm group id đã xác thực để đọc dữ liệu. Bạn có thể tùy ý thiết lập cho group id. Tuy nhiên, cần cẩn trọng bởi nó có thể tạo ra những kết quả không như mong muốn. Truy vấn đồng thời (cả truy vấn luồng và truy vấn tập hợp) hoặc các source với cùng group id có thể gây ra tình trạng lẫn lộn và kết quả là chỉ đọc được một phần dữ liệu. Tình trạng này cũng có thể xảy ra khi truy vấn nhiều hoặc truy vấn lại liên tục. Để giảm thiểu tối đa tình trạng này, thiết lập Kafka consumer session timeout (bằng option "kafka.session.timeout.ms") vô cùng bé. Khi Kafka session timeout được thiết lập, nó sẽ bỏ qua option "groupIdPrefix". | 
| includeHeaders | boolean | false | streaming query, batch query | Xác định liệu có Kafka header trong các hàng. |

# Bộ nhớ đệm cho Consumer (Consumer Caching)

Quá trình khởi tạo những Kafka consumer là một quá trình tốn thời gian, điều này càng trở nên chính xác trong bối cảnh xử lý trực tiếp theo dòng truyền, khi mà thời gian là nhân tố mang tính quyết định. Chính bởi điều này, Spark tập trung các consumer cho quá trình xử lý bằng cách tận dụng Apache Commons Pool.
Chìa khoá cho bộ đệm được tạo ra từ các thông tin sau:
- Topic name.
- Topic partition.
- Group ID.

Để cấu hình cho consumer pool, có thể sử dụng các thông số sau:
    
|       Tên thuộc tính      | Giá trị mặc định |                                   Ý nghĩa                    | Khả dụng từ phiên bản |
|:--------------------------|:----------------:|:-------------------------------------------------------------|:-------------:|
| spark.kafka.consumer.cache.capacity                 |        64        | Số lượng tối đa consumer có thể được lưu vào bộ đệm. **Lưu ý đây là giới hạn mềm.**                                                                                                                                         |         3.0.0         |
| spark.kafka.consumer.cache.timeout                  |    5m (5 phút)   | Khoảng thời gian tối thiểu để một consumer trong trạng thái nghỉ ngơi trước khi có thể bị đẩy ra khỏi bộ đệm bởi chương trình.                                                                                              |         3.0.0         |
| spark.kafka.consumer.cache.evictorThreadRunInterval |    1m (1 phút)   | Chu kì giữa hai lần chạy của tiến trình kiểm tra trạng thái nghỉ trong consumer pool. Khi không ở trạng thái tích cực, sẽ không có tiến trình kiểm tra nào chạy.                                                          |         3.0.0         |
| spark.kafka.consumer.cache.jmx.enable               |       false      | Bật hoặc tắt JMX cho những pool được khởi tạo với trạng thái cấu hình này. Số liệu thống kê về các pool có sẵn thông qua đối tượng JMX. Tiền tố của tên JMX được thiết lập là "kafka010-cached-simple-kafka-consumer-pool". |         3.0.0         |

Kích thước của pool bị giới hạn bởi spark.kafka.consumer.cache.capacity, tuy nhiên nó hoạt động như một "giới hạn mềm" để không block các tác vụ của Spark.

Các tiến trình kiểm tra trạng thái nghỉ định kì xoá các consumer không được sử dụng trong thời gian dài hơn thời gian timeout. Nếu ngưỡng này đạt tới, nó sẽ cố để xoá mục ít sử dụng nhất mà hiện đang không được sử dụng.

Nếu không thể xoá, khi đó pool tiếp tục tăng lên. Trường hợp tệ nhất xảy đến khi pool phình to bằng với số tác vụ tối đa có thể chạy trên luồng thực thi (gọi là số vị trí tác vụ).

Nếu tác vụ thất bại bởi bất kì lý do gì, tác vụ mới sẽ được thực thi với việc tạo mới Kafka consumer nhằm đảm bảo tính an toàn của chương trình. Trong cùng một thời điểm, ta vô hiệu hoá tất cả các consumers trong pool mà có chung khoá caching để loại bỏ tất cả các consumer được sử dụng trong tiến trình thực thi đã bị hỏng.

Cùng với consumer, Spark tập trung các bản ghi lấy được từ Kafka một cách riêng biệt để cho các Kafka consumer không trạng thái từ góc nhìn của Spark cũng như tối đa hoá hiệu quả của việc tập trung. Nó tận dụng được cùng một khoá cache với pool dành cho Kafka consumer. **Lưu ý rằng nó không tận dụng Apache Commons Pool do sự khác nhau về các dặc trưng.**

Bảng sau nêu ra các thuộc tính có thể được cấu hình để lấy dữ liệu từ data pool nói trên:
    
| Tên thuộc tính                                   | Giá trị mặc định | Ý nghĩa                                                                           | Khả dụng từ phiên bản |
|:--------------------------------------------------|:------------|:-----------------------------------------------------------------------------------------|:--------------|
| spark.kafka.consumer.fetchedData.cache.timeout                  | 5m (5 phút)      | Khoảng thời gian tối thiểu một dữ liệu đã được lấy về có thể ở trong pool trước khi có thể bị loại bỏ                                                                | 3.0.0                 |
| spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 1m (1 phút)      | Khoảng thời gian giữa các lần chạy của tiến trình kiểm tra trạng thái nghỉ trên data pool. Khi không ở trạng thái tích cực, sẽ không có tiến trình kiểm tra nào chạy | 3.0.0                 |