In [3]:
import json
from kafka import KafkaProducer, KafkaConsumer
import mysql.connector

# Kafka 配置
TOPIC = 'student_data'
KAFKA_SERVER = 'localhost:9092'

# 数据库连接
connection = mysql.connector.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school',
    charset='utf8mb4'
)

def fetch_data():
    with connection.cursor() as cursor:
        sql = "SELECT * FROM Student"
        cursor.execute(sql)
        students = cursor.fetchall()
        keys = ['sno', 'sname', 'ssex', 'sage', 'sdept']
        data = [dict(zip(keys, student)) for student in students]
    return data

# Kafka 生产者
def send_to_kafka(data):
    producer = KafkaProducer(
        bootstrap_servers=[KAFKA_SERVER],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    for record in data:
        producer.send(TOPIC, record)
        print(f"发送到 Kafka: {record}")
    producer.close()

# Kafka 消费者
def consume_from_kafka():
    consumer = KafkaConsumer(
        TOPIC,
        bootstrap_servers=[KAFKA_SERVER],
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        auto_offset_reset='earliest',
        enable_auto_commit=True
    )
    for message in consumer:
        print(f"从 Kafka 接收: {message.value}")

if __name__ == "__main__":
    # 1. 从数据库读取数据
    student_data = fetch_data()

    # 2. 发送到 Kafka
    send_to_kafka(student_data)

    # 3. 从 Kafka 接收并打印
    consume_from_kafka()


发送到 Kafka: {'sno': '202205325113', 'sname': '李卓凡', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
发送到 Kafka: {'sno': '202205325114', 'sname': '童遵员', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
发送到 Kafka: {'sno': '202205325115', 'sname': '王孔昊', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
发送到 Kafka: {'sno': '202205325116', 'sname': '王译史', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
发送到 Kafka: {'sno': '202205325117', 'sname': '同学4', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
从 Kafka 接收: {'sno': '202205325113', 'sname': '李卓凡', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
从 Kafka 接收: {'sno': '202205325114', 'sname': '童遵员', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
从 Kafka 接收: {'sno': '202205325115', 'sname': '王孔昊', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
从 Kafka 接收: {'sno': '202205325116', 'sname': '王译史', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}
从 Kafka 接收: {'sno': '202205325117', 'sname': '同学4', 'ssex': '男', 'sage': 21, 'sdept': 'CS'}


KeyboardInterrupt: 