In [None]:
from kafka import KafkaConsumer
import sys, json, pymysql, pymongo

if __name__ == "__main__":

    # 設定要連線到Kafka集群的相關設定, 產生一個Kafka的Consumer的實例
    consumer1 = KafkaConsumer(
        # 指定Kafka集群伺服器
        bootstrap_servers=["kafka:9092"],
        # ConsumerGroup的名稱, 可以不指定
        #group_id="cg_001",
        # 指定msgKey的反序列化器, 若Key為None, 無法反序列化
        # key_deserializer=bytes.decode,
        # 指定msgValue的反序列化器
        #value_deserializer=bytes.decode,
        value_deserializer=lambda m: json.loads(m.decode('ascii')),
        # 是否從這個ConsumerGroup尚未讀取的partition / offset開始讀
        auto_offset_reset="earliest",
    )
   
    # 讓Consumer向Kafka集群訂閱指定的topic
    consumer1.subscribe(topics="temperature")
    
    # 持續的拉取Kafka有進來的訊息
    try:
        print("Now listening for incoming messages ...")
        # 持續監控是否有新的record進來
        for record in consumer1:
            topic = record.topic
            partition = record.partition
            offset = record.offset
            timestamp = record.timestamp
            # 取出msgKey與msgValue
            msgKey = record.key
            msgValue = record.value
            # 秀出metadata與msgKey & msgValue訊息
            print("topic=%s, partition=%s, offset=%s : (key=%s, value=%s)" % (record.topic, record.partition, 
                                                                              record.offset, record.key, record.value))

    except:
        # 錯誤處理
        e_type, e_value, e_traceback = sys.exc_info()
        print("type ==> %s" % (e_type))
        print("value ==> %s" % (e_value))
        print("traceback ==> file name: %s" % (e_traceback.tb_frame.f_code.co_filename))
        print("traceback ==> line no: %s" % (e_traceback.tb_lineno))
        print("traceback ==> function name: %s" % (e_traceback.tb_frame.f_code.co_name))
    finally:
        consumer1.close()