In [3]:
from kafka import KafkaConsumer
import sys, json, pymysql, pymongo

if __name__ == "__main__":

    # 與 MongoDB連線
    client = pymongo.MongoClient(host="mongodb", port=27017)
    # 指定為 test 資料庫
    db = client.test
    # 指定 temp_humidity 集合, MongoDB的每個資料庫又包含許多集合(collection), 類似於關聯性資料庫中的表
    collection = db.temp_humidity

    # 設定要連線到Kafka集群的相關設定, 產生一個Kafka的Consumer的實例
    consumer = KafkaConsumer(
        # 指定Kafka集群伺服器
        bootstrap_servers=["kafka:9092"],
        # ConsumerGroup的名稱, 可以不指定
        #group_id="cg_001",
        # 指定msgKey的反序列化器, 若Key為None, 無法反序列化
        # key_deserializer=bytes.decode,
        # 指定msgValue的反序列化器
        #value_deserializer=bytes.decode,
        value_deserializer=lambda m: json.loads(m.decode('ascii')),
        # 是否從這個ConsumerGroup尚未讀取的partition / offset開始讀
        auto_offset_reset="earliest",
    )
   
    # 讓Consumer向Kafka集群訂閱指定的topic
    consumer.subscribe(topics="TEMP_HUMIDITY_ENRICHED")
    
    # 持續的拉取Kafka有進來的訊息
    try:
        print("Now listening for incoming messages ...")
        # 持續監控是否有新的record進來
        for record in consumer:
            topic = record.topic
            partition = record.partition
            offset = record.offset
            timestamp = record.timestamp
            # 取出msgKey與msgValue
            msgKey = record.key
            msgValue = record.value
            # 秀出metadata與msgKey & msgValue訊息
            print("topic=%s, partition=%s, offset=%s : (key=%s, value=%s)" % (record.topic, record.partition, 
                                                                              record.offset, record.key, record.value))
            j = {"device_id": msgValue["T_DEVICE_ID"], "timestamp": msgValue["T_TIMESTAMP"], 
                  "temperature": msgValue["TEMPERATURE"], "humidity": msgValue["HUMIDITY"], "rd": msgValue["T_RD"]}
            print("json=",  j)
            
            # 將資料存入 mongodb
            # 存入單筆
            result = collection.insert_one(j)
            # 存入多筆
            #result = collection.insert_many()
            print(result)

    except:
        # 錯誤處理
        e_type, e_value, e_traceback = sys.exc_info()
        print("type ==> %s" % (e_type))
        print("value ==> %s" % (e_value))
        print("traceback ==> file name: %s" % (e_traceback.tb_frame.f_code.co_filename))
        print("traceback ==> line no: %s" % (e_traceback.tb_lineno))
        print("traceback ==> function name: %s" % (e_traceback.tb_frame.f_code.co_name))
    finally:
        consumer.close()

Now listening for incoming messages ...
topic=TEMP_HUMIDITY_ENRICHED, partition=0, offset=0 : (key=b'0.1774516791151951', value={'T_DEVICE_ID': '001', 'T_TIMESTAMP': '2019-09-19 08:01:58', 'TEMPERATURE': 29.88, 'HUMIDITY': 100, 'T_RD': '0.1774516791151951'})
json= {'device_id': '001', 'timestamp': '2019-09-19 08:01:58', 'temperature': 29.88, 'humidity': 100, 'rd': '0.1774516791151951'}
<pymongo.results.InsertOneResult object at 0x7fb860394888>
topic=TEMP_HUMIDITY_ENRICHED, partition=0, offset=1 : (key=b'0.07274631587036307', value={'T_DEVICE_ID': '001', 'T_TIMESTAMP': '2019-09-19 08:02:00', 'TEMPERATURE': 29.32, 'HUMIDITY': 63, 'T_RD': '0.07274631587036307'})
json= {'device_id': '001', 'timestamp': '2019-09-19 08:02:00', 'temperature': 29.32, 'humidity': 63, 'rd': '0.07274631587036307'}
<pymongo.results.InsertOneResult object at 0x7fb860394988>
topic=TEMP_HUMIDITY_ENRICHED, partition=0, offset=2 : (key=b'0.6662571651231061', value={'T_DEVICE_ID': '001', 'T_TIMESTAMP': '2019-09-19 08:02

In [19]:
from kafka import KafkaConsumer
import sys, json, pymysql, pymongo
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import time

if __name__ == "__main__":

    # 與 MongoDB連線
    #client = pymongo.MongoClient(host="mongodb", port=27017)
    # 指定為 test 資料庫
    #db = client.test
    # 指定 temp_humidity 集合, MongoDB的每個資料庫又包含許多集合(collection), 類似於關聯性資料庫中的表
    #collection = db.temp_humidity

    # 設定要連線到Kafka集群的相關設定, 產生一個Kafka的Consumer的實例
    consumer = KafkaConsumer(
        # 指定Kafka集群伺服器
        bootstrap_servers=["kafka:9092"],
        # ConsumerGroup的名稱, 可以不指定
        #group_id="cg_001",
        # 指定msgKey的反序列化器, 若Key為None, 無法反序列化
        # key_deserializer=bytes.decode,
        # 指定msgValue的反序列化器
        #value_deserializer=bytes.decode,
        value_deserializer=lambda m: json.loads(m.decode('ascii')),
        # 是否從這個ConsumerGroup尚未讀取的partition / offset開始讀
        auto_offset_reset="earliest",
    )
   
    # 讓Consumer向Kafka集群訂閱指定的topic
    consumer.subscribe(topics="TEMP_HUMIDITY_ENRICHED")
    
    es = Elasticsearch('http://elasticsearch:9200')
    
    
    # 持續的拉取Kafka有進來的訊息
    try:
        print("Now listening for incoming messages ...")
        # 持續監控是否有新的record進來
        for record in consumer:
            topic = record.topic
            partition = record.partition
            offset = record.offset
            timestamp = record.timestamp
            # 取出msgKey與msgValue
            msgKey = record.key
            msgValue = record.value
            # 秀出metadata與msgKey & msgValue訊息
            print("topic=%s, partition=%s, offset=%s : (key=%s, value=%s)" % (record.topic, record.partition, 
                                                                              record.offset, record.key, record.value))
            j = {"device_id": msgValue["T_DEVICE_ID"], "timestamp": datetime.now(), 
                  "temperature": msgValue["TEMPERATURE"], "humidity": msgValue["HUMIDITY"], "rd": msgValue["T_RD"]}
            print("json=",  j)
            
            # 將資料存入 mongodb
            # 存入單筆
            #result = collection.insert_one(j)
            # 存入多筆
            #result = collection.insert_many()
            print(result)
            res = es.index(index="temp_humidity1", doc_type='tweet', id=datetime.now(), body=j)
            print(res['result'])

    except:
        # 錯誤處理
        e_type, e_value, e_traceback = sys.exc_info()
        print("type ==> %s" % (e_type))
        print("value ==> %s" % (e_value))
        print("traceback ==> file name: %s" % (e_traceback.tb_frame.f_code.co_filename))
        print("traceback ==> line no: %s" % (e_traceback.tb_lineno))
        print("traceback ==> function name: %s" % (e_traceback.tb_frame.f_code.co_name))
    finally:
        consumer.close()

Now listening for incoming messages ...
topic=TEMP_HUMIDITY_ENRICHED, partition=3, offset=0 : (key=b'0.14562536344048926', value={'T_DEVICE_ID': '001', 'T_TIMESTAMP': '2019-09-19 08:02:05', 'TEMPERATURE': 27.67, 'HUMIDITY': 26, 'T_RD': '0.14562536344048926'})
json= {'device_id': '001', 'timestamp': datetime.datetime(2019, 9, 19, 8, 47, 15, 313819), 'temperature': 27.67, 'humidity': 26, 'rd': '0.14562536344048926'}
<pymongo.results.InsertOneResult object at 0x7fb861402a88>
created
topic=TEMP_HUMIDITY_ENRICHED, partition=3, offset=1 : (key=b'0.9677200796285044', value={'T_DEVICE_ID': '001', 'T_TIMESTAMP': '2019-09-19 08:02:07', 'TEMPERATURE': 25.2, 'HUMIDITY': 90, 'T_RD': '0.9677200796285044'})
json= {'device_id': '001', 'timestamp': datetime.datetime(2019, 9, 19, 8, 47, 15, 319125), 'temperature': 25.2, 'humidity': 90, 'rd': '0.9677200796285044'}
<pymongo.results.InsertOneResult object at 0x7fb861402a88>
created
topic=TEMP_HUMIDITY_ENRICHED, partition=3, offset=2 : (key=b'0.016204755833

In [23]:
## step test
from kafka import KafkaConsumer
import sys, json, pymysql, pymongo


if __name__ == "__main__":

    # 與 MongoDB連線
    client = pymongo.MongoClient(host="mongodb", port=27017)
    # 指定為 test 資料庫
    db = client.test
    # 指定 temp_humidity 集合, MongoDB的每個資料庫又包含許多集合(collection), 類似於關聯性資料庫中的表
    collection = db.step

    # 設定要連線到Kafka集群的相關設定, 產生一個Kafka的Consumer的實例
    consumer = KafkaConsumer(
        # 指定Kafka集群伺服器
        bootstrap_servers=["kafka:9092"],
        # ConsumerGroup的名稱, 可以不指定
        #group_id="cg_001",
        # 指定msgKey的反序列化器, 若Key為None, 無法反序列化
        # key_deserializer=bytes.decode,
        # 指定msgValue的反序列化器
        #value_deserializer=bytes.decode,
        value_deserializer=lambda m: json.loads(m.decode('ascii')),
        # 是否從這個ConsumerGroup尚未讀取的partition / offset開始讀
        auto_offset_reset="earliest",
    )
   
    # 讓Consumer向Kafka集群訂閱指定的topic
    consumer.subscribe(topics="step")
    
    #es = Elasticsearch('http://elasticsearch:9200')
    
    
    # 持續的拉取Kafka有進來的訊息
    try:
        print("Now listening for incoming messages ...")
        # 持續監控是否有新的record進來
        for record in consumer:
            topic = record.topic
            partition = record.partition
            offset = record.offset
            timestamp = record.timestamp
            # 取出msgKey與msgValue
            msgKey = record.key
            msgValue = record.value
            # 秀出metadata與msgKey & msgValue訊息
            print("topic=%s, partition=%s, offset=%s : (key=%s, value=%s)" % (record.topic, record.partition, 
                                                                              record.offset, record.key, record.value))
            j = {"device_id": msgValue["ID"], "timestamp": msgValue["DateTime"], 
                 "X_unit": msgValue["X_unit"], "Y_unit": msgValue["Y_unit"], "Z_unit": msgValue["Z_unit"],
                 "Xr": msgValue["Xr"], "Yr": msgValue["Yr"], "Zr": msgValue["Zr"],
                 "Xr_unit": msgValue["Xr_unit"], "Yr_unit": msgValue["Yr_unit"], "Zr_unit": msgValue["Zr_unit"]}
            print("json=",  j)
            
            # 將資料存入 mongodb
            # 存入單筆
            result = collection.insert_one(j)
            # 存入多筆
            #result = collection.insert_many()
            print(result)
            #res = es.index(index="temp_humidity1", doc_type='tweet', id=datetime.now(), body=j)
            #print(res['result'])

    except:
        # 錯誤處理
        e_type, e_value, e_traceback = sys.exc_info()
        print("type ==> %s" % (e_type))
        print("value ==> %s" % (e_value))
        print("traceback ==> file name: %s" % (e_traceback.tb_frame.f_code.co_filename))
        print("traceback ==> line no: %s" % (e_traceback.tb_lineno))
        print("traceback ==> function name: %s" % (e_traceback.tb_frame.f_code.co_name))
    finally:
        consumer.close()

Now listening for incoming messages ...
topic=step, partition=0, offset=0 : (key=None, value={'Xr_unit': -0.09130859375, 'X_unit': -2, 'Zr_unit': 0.88671875, 'DateTime': '2019-09-19 16:55:39', 'Y_unit': -1, 'Y': -41, 'X': -198, 'Z': 109, 'ID': 'ausitn', 'Z_unit': 0, 'Yr_unit': 0.345703125, 'Xr': -1496, 'Yr': 5664, 'Zr': 14528})
json= {'device_id': 'ausitn', 'timestamp': '2019-09-19 16:55:39', 'X_unit': -2, 'Y_unit': -1, 'Z_unit': 0, 'Xr': -1496, 'Yr': 5664, 'Zr': 14528, 'Xr_unit': -0.09130859375, 'Yr_unit': 0.345703125, 'Zr_unit': 0.88671875}
<pymongo.results.InsertOneResult object at 0x7fb86264cf08>
topic=step, partition=0, offset=1 : (key=None, value={'Xr_unit': -0.089111328125, 'X_unit': -2, 'Zr_unit': 0.885009765625, 'DateTime': '2019-09-19 16:55:41', 'Y_unit': -1, 'Y': -44, 'X': -151, 'Z': 134, 'ID': 'ausitn', 'Z_unit': 1, 'Yr_unit': 0.349609375, 'Xr': -1460, 'Yr': 5728, 'Zr': 14500})
json= {'device_id': 'ausitn', 'timestamp': '2019-09-19 16:55:41', 'X_unit': -2, 'Y_unit': -1, 'Z_

json= {'device_id': 'ausitn', 'timestamp': '2019-09-19 16:55:52', 'X_unit': -2, 'Y_unit': -1, 'Z_unit': 1, 'Xr': -1568, 'Yr': 5420, 'Zr': 14512, 'Xr_unit': -0.095703125, 'Yr_unit': 0.330810546875, 'Zr_unit': 0.8857421875}
<pymongo.results.InsertOneResult object at 0x7fb8602d5408>
topic=step, partition=0, offset=98 : (key=None, value={'Xr_unit': -0.099609375, 'X_unit': -2, 'Zr_unit': 0.88232421875, 'DateTime': '2019-09-19 16:55:53', 'Y_unit': -1, 'Y': -38, 'X': -221, 'Z': 95, 'ID': 'ausitn', 'Z_unit': 0, 'Yr_unit': 0.34716796875, 'Xr': -1632, 'Yr': 5688, 'Zr': 14456})
json= {'device_id': 'ausitn', 'timestamp': '2019-09-19 16:55:53', 'X_unit': -2, 'Y_unit': -1, 'Z_unit': 0, 'Xr': -1632, 'Yr': 5688, 'Zr': 14456, 'Xr_unit': -0.099609375, 'Yr_unit': 0.34716796875, 'Zr_unit': 0.88232421875}
<pymongo.results.InsertOneResult object at 0x7fb860331108>
topic=step, partition=0, offset=99 : (key=None, value={'Xr_unit': -0.09521484375, 'X_unit': -1, 'Zr_unit': 0.886474609375, 'DateTime': '2019-09-1

<pymongo.results.InsertOneResult object at 0x7fb8602d5408>
topic=step, partition=0, offset=233 : (key=None, value={'Xr_unit': -0.093994140625, 'X_unit': -2, 'Zr_unit': 0.898193359375, 'DateTime': '2019-09-19 16:56:09', 'Y_unit': -1, 'Y': -41, 'X': -187, 'Z': 134, 'ID': 'ausitn', 'Z_unit': 1, 'Yr_unit': 0.350830078125, 'Xr': -1540, 'Yr': 5748, 'Zr': 14716})
json= {'device_id': 'ausitn', 'timestamp': '2019-09-19 16:56:09', 'X_unit': -2, 'Y_unit': -1, 'Z_unit': 1, 'Xr': -1540, 'Yr': 5748, 'Zr': 14716, 'Xr_unit': -0.093994140625, 'Yr_unit': 0.350830078125, 'Zr_unit': 0.898193359375}
<pymongo.results.InsertOneResult object at 0x7fb86264cf08>
topic=step, partition=0, offset=234 : (key=None, value={'Xr_unit': -0.10107421875, 'X_unit': -2, 'Zr_unit': 0.8798828125, 'DateTime': '2019-09-19 16:56:09', 'Y_unit': -1, 'Y': -51, 'X': -149, 'Z': 88, 'ID': 'ausitn', 'Z_unit': 0, 'Yr_unit': 0.34375, 'Xr': -1656, 'Yr': 5632, 'Zr': 14416})
json= {'device_id': 'ausitn', 'timestamp': '2019-09-19 16:56:09', 

type ==> <class 'KeyboardInterrupt'>
value ==> 
traceback ==> file name: <ipython-input-23-cc88af2f6c2c>
traceback ==> line no: 40
traceback ==> function name: <module>
