In [2]:
!pip install elasticsearch



In [3]:
import requests
url = 'http://192.168.33.10:9200'
data = requests.get(url)
data

<Response [200]>

In [4]:
from elasticsearch import Elasticsearch
es = Elasticsearch('http://192.168.33.10:9200')
es

<Elasticsearch([{'host': '192.168.33.10', 'port': 9200}])>

In [5]:
from datetime import datetime
doc = {
    'author': 'Justin',
    'text': '物聯網平台ELK實作',
    'timestamp': datetime.now()
}
doc

{'author': 'Justin',
 'text': '物聯網平台ELK實作',
 'timestamp': datetime.datetime(2020, 7, 30, 4, 40, 43, 163373)}

In [6]:
res = es.index(index='test', doc_type='elk', body=doc)
print(res['result'])

created


In [7]:
res = es.search(index='test', body={"query":{"match_all":{}}})
for hit in res['hits']['hits']:
    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

2020-07-29T17:02:39.607215 Justin: 物聯網平台ELK實作
2020-07-30T03:35:45.764282 Justin: 物聯網平台ELK實作


In [8]:
from kafka import KafkaConsumer
import sys, json, pymysql, pymongo
from elasticsearch import Elasticsearch
import datetime
import time
from IPython.display import clear_output, display

if __name__ == "__main__":

    # 與 MongoDB連線
    client = pymongo.MongoClient(host="mongodb", port=27017)
    # 指定為 test 資料庫
    db = client.test
    # 指定 temp_humidity 集合, MongoDB的每個資料庫又包含許多集合(collection), 類似於關聯性資料庫中的表
    collection = db.temp_humidity

    # 設定要連線到Kafka集群的相關設定, 產生一個Kafka的Consumer的實例
    consumer = KafkaConsumer(
        # 指定Kafka集群伺服器
        bootstrap_servers=["kafka:9092"],
        # ConsumerGroup的名稱, 可以不指定
        #group_id="cg_001",
        # 指定msgKey的反序列化器, 若Key為None, 無法反序列化
        # key_deserializer=bytes.decode,
        # 指定msgValue的反序列化器
        #value_deserializer=bytes.decode,
        value_deserializer=lambda m: json.loads(m.decode('ascii')),
        # 是否從這個ConsumerGroup尚未讀取的partition / offset開始讀
        auto_offset_reset="earliest",
        enable_auto_commit=True
    )
   
    # 讓Consumer向Kafka集群訂閱指定的topic
    consumer.subscribe(topics="TEMP_HUMIDITY_ENRICHED")
    
    # 持續的拉取Kafka有進來的訊息
    try:
        print("Now listening for incoming messages ...")
        # 持續監控是否有新的record進來
        for record in consumer:
            clear_output(wait=True)
            topic = record.topic
            partition = record.partition
            offset = record.offset
            timestamp = record.timestamp
            # 取出msgKey與msgValue
            msgKey = record.key
            msgValue = record.value
            # 秀出metadata與msgKey & msgValue訊息
            #print("topic=%s, partition=%s, offset=%s : (key=%s, value=%s)" % (record.topic, record.partition, 
            #                                                                  record.offset, record.key, record.value))
            #j = {"device_id": msgValue["T_DEVICE_ID"], "timestamp": msgValue["T_TIMESTAMP"], 
            #      "Temperature": msgValue["TEMPERATURE"], "Humidity": msgValue["HUMIDITY"]}
            #print("json=",  j)
            
            # 將資料存入 mongodb
            # 存入單筆
            #result = collection.insert_one(j)
            # 存入多筆
            #result = collection.insert_many()
            #print(result)
            
            #將資料寫入ES
            dt = datetime.datetime.strptime(msgValue["T_TIMESTAMP"], "%Y-%m-%d %H:%M:%S")
            #es_id = time.time()
            es_id = msgValue['T_RD']
            #print(dt)
            doc = {"device_id": msgValue["T_DEVICE_ID"], "timestamp": dt, "logtime": msgValue['T_TIMESTAMP'],
                  "Temperature": msgValue["TEMPERATURE"], "Humidity": msgValue["HUMIDITY"], "rd": msgValue['T_RD']}
            #print(doc)
            res = es.index(index='ltu-iot', doc_type='elk', id=es_id, body=doc)
            print("%s, rd= %s %s"%(dt, msgValue['T_RD'],res['result']))

    except:
        # 錯誤處理
        e_type, e_value, e_traceback = sys.exc_info()
        print("type ==> %s" % (e_type))
        print("value ==> %s" % (e_value))
        print("traceback ==> file name: %s" % (e_traceback.tb_frame.f_code.co_filename))
        print("traceback ==> line no: %s" % (e_traceback.tb_lineno))
        print("traceback ==> function name: %s" % (e_traceback.tb_frame.f_code.co_name))
    finally:
        consumer.close()

2020-07-30 05:13:30, rd= 1596086010347 created
type ==> <class 'KeyboardInterrupt'>
value ==> 
traceback ==> file name: <ipython-input-8-ad8b0d0ffd16>
traceback ==> line no: 40
traceback ==> function name: <module>
