In [1]:
"""
Kafka 프로듀서에서 메시지를 생성하고, 컨슈머에서 `fetch.min.bytes` 및 `max.poll.records` 값을 변경하며 메시지 소비 속도를 실험하는 코드입니다.

TODO:
1. Kafka 프로듀서를 생성하여 메시지를 발행합니다.
2. Kafka 컨슈머를 생성하고 `fetch.min.bytes` 및 `max.poll.records` 값을 변경합니다.
3. 서로 다른 설정에서 메시지 소비 속도를 비교합니다.
4. 메시지를 모두 소비할 때까지 걸린 시간을 출력합니다.
"""

from kafka import KafkaProducer, KafkaConsumer
import time
import json

In [2]:
# 설정 값
BROKER = "localhost:9092"
TOPIC = "test-topic"
FETCH_SIZES = [1024, 10240, 51200]  # 1KB, 10KB, 50KB
POLL_RECORDS = [10, 100, 500]  # 10개, 100개, 500개
NUM_MESSAGES = 50000  # 총 메시지 개수
MESSAGE_SIZE = 100  # 메시지 크기를 100바이트로 설정

In [3]:
# TODO 1: 100바이트 크기의 JSON 메시지를 생성
MESSAGE_PAYLOAD = json.dumps({"data": "A" * (MESSAGE_SIZE - 20)}).encode('utf-8')  # 메시지 크기 설정

In [4]:
# TODO 2: Kafka 프로듀서를 생성하고 메시지 발행
print("Producing messages...")

producer = KafkaProducer(
    bootstrap_servers=BROKER,  # Kafka 브로커 주소 설정
    batch_size=32768,  # 배치 크기 설정
    linger_ms=5,  # 배치를 적절히 활용하도록 설정
    acks=-1  # 메시지 전송 확인 설정
)

Producing messages...


In [5]:
# TODO 3: NUM_MESSAGES 개수만큼 메시지 전송
start_time = time.time()  # 메시지 전송 시작 시간 기록
for _ in range(NUM_MESSAGES):  # 메시지 전송 반복 횟수
    producer.send(TOPIC, MESSAGE_PAYLOAD)  # 메시지 전송

producer.flush()  # 모든 메시지 전송 완료
elapsed_time = time.time() - start_time  # 경과 시간 측정
print(f"Produced {NUM_MESSAGES} messages in {elapsed_time:.3f} sec\n")

Produced 50000 messages in 0.980 sec



In [6]:
# TODO 4: 서로 다른 Fetch size 및 Poll 간격 설정을 조합하여 컨슈머 테스트
for fetch_size in FETCH_SIZES:
    for poll_records in POLL_RECORDS:
        print(f"Testing fetch.min.bytes = {fetch_size}, max.poll.records = {poll_records}...")

        # TODO 5: Kafka 컨슈머를 생성하고 설정 변경
        consumer = KafkaConsumer(
            TOPIC,
            bootstrap_servers=BROKER,  # Kafka 브로커 주소 설정
            auto_offset_reset='earliest',  # 오프셋 초기화 방식 설정
            enable_auto_commit=False,  # 자동 오프셋 커밋 여부 설정
            fetch_min_bytes=fetch_size,  # 최소 Fetch 크기 설정
            max_poll_records=poll_records  # 최대 Poll 개수 설정
        )

        # TODO 6: 메시지 소비 시작 시간 기록
        start_time = time.time()

        # TODO 7: NUM_MESSAGES 개수만큼 메시지 소비
        message_count = 0
        for message in consumer:
            message_count += 1
            if message_count >= NUM_MESSAGES:  # 원하는 메시지 개수 도달 시 종료
                break

        # TODO 8: 모든 메시지 소비 완료 후 시간 측정
        elapsed_time = time.time() - start_time

        # TODO 9: 결과 출력
        print(f"Fetch size: {fetch_size}, Poll records: {poll_records}, Time taken: {elapsed_time:.3f} sec\n")

        # TODO 10: 테스트 간 간격 추가
        time.sleep(2)  # 테스트 간 2초 대기

Testing fetch.min.bytes = 1024, max.poll.records = 10...
Fetch size: 1024, Poll records: 10, Time taken: 0.407 sec

Testing fetch.min.bytes = 1024, max.poll.records = 100...
Fetch size: 1024, Poll records: 100, Time taken: 0.331 sec

Testing fetch.min.bytes = 1024, max.poll.records = 500...
Fetch size: 1024, Poll records: 500, Time taken: 0.327 sec

Testing fetch.min.bytes = 10240, max.poll.records = 10...
Fetch size: 10240, Poll records: 10, Time taken: 0.362 sec

Testing fetch.min.bytes = 10240, max.poll.records = 100...
Fetch size: 10240, Poll records: 100, Time taken: 0.343 sec

Testing fetch.min.bytes = 10240, max.poll.records = 500...
Fetch size: 10240, Poll records: 500, Time taken: 0.331 sec

Testing fetch.min.bytes = 51200, max.poll.records = 10...
Fetch size: 51200, Poll records: 10, Time taken: 0.355 sec

Testing fetch.min.bytes = 51200, max.poll.records = 100...
Fetch size: 51200, Poll records: 100, Time taken: 0.334 sec

Testing fetch.min.bytes = 51200, max.poll.records = 