# 基本概念

# 本地安裝與啟動（基於Docker）

### 下載 zookeeper 鏡像與 kafka 鏡像

### 本地啟動 zookeeper

### 本地啟動 kafka

### 進入 kafka bash

### 創建 Topic，分區為 2，Topic name 為 'kafka_demo'

### 查看當前所有 topic

### 安装kafka-python

# 生產者（Producer）與消費者（Consumer）

In [None]:
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import traceback
import json


def producer_demo():
    # 假設生產的消息為鍵值對（不是一定要鍵值對），且序列化方式為 json
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode(),
        api_version=(2, 8, 1))
    
    # 發送三條消息
    for i in range(0, 3):
        future = producer.send(
            'kafka_demo',
            key='count_num',  # 同一個 key 值，會被送至同一個分區
            value=str(i),
            partition=1)  # 向分區 1 發送消息
        print("send {}".format(str(i)))
        try:
            future.get(timeout=10)  # 監控是否發送成功
        except KafkaError:  # 發送失敗拋出 kafka_errors
            traceback.format_exc()


def consumer_demo():
    consumer = KafkaConsumer(
        'kafka_demo',
        bootstrap_servers='localhost:9092',
        group_id='test',
        api_version=(2, 8, 1),
        consumer_timeout_ms=10000,  # 如果 10 秒內 kafka 中沒有可供消費的數據，自動退出
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
        )
        )

In [None]:
consumer_demo()

In [None]:
producer_demo()

# 消費者進階操作

### 初始化參數

### 手動 commit

In [None]:
def consumer_demo():
    consumer = KafkaConsumer(
        'kafka_demo', 
        bootstrap_servers='localhost:9092',
        group_id='test',
        enable_auto_commit=False
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
        consumer.commit()

### 查看 kafka 堆積剩餘量

In [None]:
consumer = KafkaConsumer(topic, **kwargs)
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]

print("start to cal offset:")

# total
toff = consumer.end_offsets(partitions)
toff = [(key.partition, toff[key]) for key in toff.keys()]
toff.sort()
print("total offset: {}".format(str(toff)))
    
# current
coff = [(x.partition, consumer.committed(x)) for x in partitions]
coff.sort()
print("current offset: {}".format(str(coff)))

# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
print("kafka left: {}".format(left_sum))