In [None]:
from kafka import KafkaConsumer
import json

# 创建Kafka消费者
consumer = KafkaConsumer('mysql_topic',
                         bootstrap_servers=['0.0.0.0:9092'],
                         auto_offset_reset='earliest',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

# 计数器，输出两条数据
count = 0

# 遍历数据
for message in consumer:
    print(message.value)
    count += 1
    if count != 2:
        continue
    else:
        break

In [None]:
from kafka import KafkaProducer
import json
import pymysql.cursors

# ********************************************* Begin *********************************************

# 连接 Kafka
producer = KafkaProducer(
    bootstrap_servers=['0.0.0.0:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 连接 MySQL 数据库
connection = pymysql.connect(
    host='0.0.0.0',
    user='root',
    password='123123',
    database='school',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

try:
    with connection.cursor() as cursor:
        # 获取数据
        sql = "SELECT sno, sname, ssex, sage FROM student"
        cursor.execute(sql)
        results = cursor.fetchall()

        # 遍历数据，将每条记录发送到 Kafka
        for row in results:
            # 映射字段名称，生成新的 JSON 格式
            mapped_row = {
                'sno': row['sno'],
                'name': row['sname'],
                'sex': row['ssex'],
                'age': row['sage']
            }
            # 发送到 Kafka Topic 'mysql_topic'
            producer.send('mysql_topic', mapped_row)
            print(f"Sent to Kafka: {mapped_row}")

        # 强制刷新缓冲区，将所有消息推送到 Kafka
        producer.flush()

finally:
    # 关闭 MySQL 连接
    connection.close()

# ********************************************* End *********************************************

In [None]:
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import json

# *************************************** Begin *****************************************
class Consumer():
    def __init__(self):
        # 配置 Kafka
        self.topic = 'json_topic'
        self.bootstrap_servers = '0.0.0.0:9092'
        self.consumer = None

    def get_connect(self):
        # 连接 Kafka
        self.consumer = KafkaConsumer(
            bootstrap_servers=self.bootstrap_servers,
            enable_auto_commit=False,  # 关闭自动提交偏移量
            auto_offset_reset='earliest',  # 从最早的消息开始读取
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # 反序列化 JSON
        )
        # 订阅主题
        self.consumer.subscribe([self.topic])

    def beginConsumer(self): 
        # 设置偏移量
        now_offset = 0 
        while True:
            # 获取并输出数据
            for message in self.consumer:
                print(f"接收到的数据: {message.value}")
                
                # 手动提交偏移量
                topic_partition = TopicPartition(self.topic, message.partition)
                offset = OffsetAndMetadata(message.offset + 1, None)
                self.consumer.commit({topic_partition: offset})
                
                # 消费一条数据后退出循环
                break
            break

        self.consumer.close()

# *************************************** End *****************************************

c = Consumer()
c.get_connect()
c.beginConsumer()

In [None]:
from kafka import KafkaProducer
import json  # 引入模块

# ********************************************* Begin ************************************

# 打开一个json文件
with open('data.json', 'r') as f:
    data = json.load(f)  # 转换为 Python 对象

# 连接 Kafka
producer = KafkaProducer(
    bootstrap_servers='0.0.0.0:9092',  # Kafka 主机地址
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # 序列化为 JSON 并使用 utf-8 编码
)

# 发送数据到 Kafka
producer.send('json_topic', data)

# 打印输出发送的数据
print(f"发送的数据: {data}")

# 确保所有消息已发送
producer.flush()

# 关闭生产者
producer.close()

# ********************************************* End ************************************

生产者程序 commit_producer.py、消费者程序 commit_consumer.py。
两道题代码块一是生产者程序，二是消费者程序。