# Dev
- 2020.12.06 DB -> Elasticsearch 자동화 코드

In [None]:
from typing import NamedTuple
from kfp.components import func_to_container_op, InputPath, OutputPath
from kfp import dsl
import kfp
import kfp.compiler as compiler
import kfp.components as comp

# Bike DB -> Component(Docker image)
def get_data_op(file_path: OutputPath('json')) -> int:
    """Load data from database"""
    import psycopg2
    import pandas as pd
    import datetime as dt
    import json
    import time
    import requests # 미설치
    from sqlalchemy import create_engine
    from datetime import datetime
    print('패키지 불러오기 성공 !')
    
    # [apiKey 출처 불분명]
    def get_addr(X, Y):
        apiKey='95B9D6ED-C2DB-3B0A-A43F-20FD442638CF'
        r =requests.get(f'http://apis.vworld.kr/coord2jibun.do?x={X}&y={Y}\
        &apiKey={apiKey}&domain=http://map.vworld.kr/&output=json')
        location = r.json()
        try:
            location = location['ADDR']
        except:
            location = ''
        return location
    
    time.sleep(20)
    NOW_DATE = datetime.now().date()
    NOW_HOUR = datetime.now().hour
    NOW_MINUTE = datetime.now().minute
    
    engine = create_engine("postgresql://postgres:6team123!@restored-aurora.cj92narf3bwn.ap-northeast-2.rds.amazonaws.com:5432/final_project")
    df = pd.read_sql(f"""
    SELECT 대여소이름, 위도, 경도, 잔여대수, 거치율, 일시 \
    FROM bike \
    WHERE 일시>='{NOW_DATE} {NOW_HOUR}:{NOW_MINUTE}';
    """,
    con = engine,
    parse_dates = ['created_at', 'updated_at'])
    print('Postgres Engine 생성 완료 !')
    
    df['권역명'] = 0
    df['권역명'] = df.apply(lambda x: get_addr(x['경도'], x['위도']), axis = 1)
    df['권역명'] = df['권역명'].apply(lambda x: x.split(' ')[1] if len(x) > 1 else x)
    
    
    target = []
    gu = ['은평구', '은평구', '성북구', '강남구']
    
    
    for row, g in zip(df[df['권역명'] == ''].iterrows(), gu):
        target.append((row[1]['위도'], row[1]['경도'], g))
    try:    
        for x in df[df['권역명'] == ''].iterrows():
            for t in target:
                if (x[1]['위도'] == t[0]) & (x[1]['경도'] == t[1]):
                    df.at[x[0], '권역명'] = t[2]
        print('권역명 Null값 보정 성공 !')
    except:
        print('[Error]권역명 Null값 보정 실패 !')
    
    
    df['location'] = df['위도'].astype('str') + ',' + df['경도'].astype('str')
    df = df[['대여소이름', '거치율', '잔여대수', '일시', 'location', '권역명']]
    df['일시'] = df['일시'].apply(lambda x: x.isoformat())
    print('데이터 전처리 완료 !')
    
    documents = df.to_dict(orient='records')
    print('Dataframe 데이터 json 변환 완료 ')
    
    with open(file_path, 'w') as writer:
        json.dump(documents,writer)
        
    NOW = datetime.now()
    length = len(documents)
    print(f'[{NOW}]DB로부터 {length}개의 document 생성 완료!')
    
    if (NOW_HOUR == 0) & (NOW_MINUTE == 0):
        return 0
    return 1
    
    
def elasticsearch_create_index_op():
    """Create Elasticsearch Index everyday"""
    import elasticsearch
    import datetime as dt
    from elasticsearch import Elasticsearch 
    from elasticsearch.helpers import bulk
    from datetime import datetime
    
    def connect_elasticsearch():
        es = None
        es = Elasticsearch(hosts="https://elastic:brYj4OIjnlBAdj0uZ4TFWhF0@90f0365b5bfa4dbf9aa9b43b4af8e16b.ap-northeast-2.aws.elastic-cloud.com:9243")

        if es.ping():
            print('Elasticsearch 연결 성공!')
        else:
            print('Elasticsearch 연결 실패!')
        return es
    es = connect_elasticsearch()
    mappings = {
       "mappings": {
           "properties" : {
             "대여소이름" : {"type" : "keyword"},
             "거치율" : {"type" : "integer"},
             "잔여대수" : {"type" : "integer"},
             "일시" : {"type" : "date"},
             "location": {"type": "geo_point"},
             "권역명" : {"type" : "keyword"}
             }
          }
       }
    
    DATE = datetime.today().date()
    NOW = datetime.now()
    res =  es.indices.get_alias("*")
    if f'ddareungi-{DATE}' not in res:
        es.indices.create(index=f'ddareungi-{DATE}',body=mappings)
        print(f'[{NOW}]ddareungi-{DATE} 인덱스 생성 완료!')
    else:
        print(f'[{NOW}]ddareungi-{DATE} 인덱스가 이미 존재합니다!')

    
# Component(Docker image) -> Elasticsearch
def elasticsearch_insert_doc_op(file_path: InputPath('json')):
    """Insert document to Elasticsearch every 5 minunte"""
    import elasticsearch
    import pandas as pd
    import datetime as dt
    import json
    from elasticsearch import Elasticsearch 
    from elasticsearch.helpers import bulk
    from datetime import datetime
    
    with open(file_path, 'r') as doc:
        documents = json.load(doc)
        
    def connect_elasticsearch():
        es = None
        es = Elasticsearch(hosts="https://elastic:brYj4OIjnlBAdj0uZ4TFWhF0@90f0365b5bfa4dbf9aa9b43b4af8e16b.ap-northeast-2.aws.elastic-cloud.com:9243")

        if es.ping():
            print('Elasticsearch 연결 성공!')
        else:
            print('Elasticsearch 연결 실패!')
        return es
    es = connect_elasticsearch()
    DATE = datetime.today().date()
    bulk(es, documents, stats_only = True, index=f'ddareungi-{DATE}')
    
    NOW = datetime.now()
    length = len(documents)
    print(f'[{NOW}]Elasticsearch로 {length}개의 document 삽입 완료!')

# Create Operator
base_image='941102633028.dkr.ecr.ap-northeast-2.amazonaws.com/6team:es_ko_KST'
get_data_op_ = func_to_container_op(
                func = get_data_op, 
                base_image = base_image, 
                packages_to_install = ['requests==2.22.0']
                )
elasticsearch_create_index_op_ = func_to_container_op(
                                func = elasticsearch_create_index_op, 
                                base_image = base_image) 
elasticsearch_insert_doc_op_  = func_to_container_op(
                                func = elasticsearch_insert_doc_op, 
                                base_image = base_image)

@dsl.pipeline(
    name='data-to-elasticsearch',
    description='to insert data to elasticsearch'
)
def elasticsearch_pipeline():
    data_op_ = get_data_op_()
    with dsl.Condition(data_op_.outputs['output'] == 0):
        create_index_op_ = elasticsearch_create_index_op_()
        insert_doc_op_= elasticsearch_insert_doc_op_(data_op_.outputs['file'])
        insert_doc_op_.after(create_index_op_)
    with dsl.Condition(data_op_.outputs['output'] == 1):
        insert_doc_op_= elasticsearch_insert_doc_op_(data_op_.outputs['file'])
        


compiler.Compiler().compile(elasticsearch_pipeline, elasticsearch_pipeline.__name__ + '.pipeline.zip')