# ELK STACK
- 본자료는 허민석님의 ELK 강의를 기반으로 Python Client와 Elasticsearch 7.9버전에 맞게 재구성한 내용입니다
- Sources:
    - https://github.com/minsuk-heo/BigData
    - https://www.youtube.com/watch?v=J2PIBQgEpC4&list=PLVNY1HnUlO24LCsgOxR_eK2Yi4sOgH9Pg&index=1
    - https://kb.objectrocket.com/elasticsearch/how-to-use-python-helpers-to-bulk-load-data-into-an-elasticsearch-index
    - https://docs.aws.amazon.com/ko_kr/elasticsearch-service/latest/developerguide/es-managedomains-logstash.html

## 1. 라이브러리 설치 및 Import

In [None]:
import sys
!{sys.executable} -m pip install elasticsearch



In [None]:
from elasticsearch import Elasticsearch, helpers
import pprint

## 2. Elasticsearch 연결

In [None]:
user = ""
password = ""
host = ""

try:
    es.transport.close()
except:
    pass
es = Elasticsearch([host],http_auth=(user,password),scheme="https",port=443)



## 3. Classes라는 인덱스 생성

In [None]:
INDEX_NAME = "classes"
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)
es.indices.create(index=INDEX_NAME)

In [None]:
# 잘 생성이 되었는지 확인
result = es.indices.get(index=INDEX_NAME)
pprint.pprint(result)

## 4. Mapping 추가
- RDB로 치면 Schema

In [None]:
body= {
    "properties" : {
        "title" : {
            "type" : "text"
        },
        "professor" : {
            "type" : "text"
        },
        "major" : {
            "type" : "text"
        },
        "semester" : {
            "type" : "text"
        },
        "student_count" : {
            "type" : "integer"
        },
        "unit" : {
            "type" : "integer"
        },
        "rating" : {
            "type" : "integer"
        },
        "submit_date" : {
            "type" : "date",
            "format" : "yyyy-MM-dd"
        },
        "school_location" : {
            "type" : "geo_point"
        }
    }
}
es.indices.put_mapping(index=INDEX_NAME,body=body)

In [None]:
#잘 추가되었는지 확인
result = es.indices.get(index=INDEX_NAME)
pprint.pprint(result)

### Bulk 인덱싱을 위한 함수들
- https://kb.objectrocket.com/elasticsearch/how-to-use-python-helpers-to-bulk-load-data-into-an-elasticsearch-index

In [None]:
import uuid
def get_data_from_file(file_name):
    if "/" in file_name or chr(92) in file_name:
        file = open(file_name, encoding="utf8", errors='ignore')
    else:
        # use the script_path() function to get path if none is passed
        file = open(script_path() + str(file_name), encoding="utf8", errors='ignore')
    data = [line.strip().replace(" ","") for line in file]
    file.close()
    return data

def bulk_json_data(json_file, _index):
    json_list = get_data_from_file(json_file)
    for doc in json_list:
        # use a `yield` generator so that the data
        # isn't loaded inbto memory
        if '{"index"' in doc:
            _id = json.loads(doc)["index"]["_id"]
        if '{"index"' not in doc:
            yield {
                "_index": _index,
                "_id": _id,
                "_source": doc
            }

## Classes 데이터 인덱싱

In [None]:
try:
    response = helpers.bulk(es, bulk_json_data("data/classes.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)

In [None]:
#잘 들어갔는지 확인!
doc = es.get(index=INDEX_NAME, id=1)
pprint.pprint(doc)

## Elasticsearch의 Search 함수 활용해보기

In [None]:
INDEX_NAME = "basketball"
try:
    response = helpers.bulk(es, bulk_json_data("data/bulk_basketball.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)

In [None]:
res = es.search(index=INDEX_NAME)
pprint.pprint(res)

In [None]:
body = {
    "query": {
        "term": {
            "points":30
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

## Elasticsearch의 Aggregation 활용해보기

In [None]:
INDEX_NAME = "basketball"
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)
try:
    response = helpers.bulk(es, bulk_json_data("data/simple_basketball.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)

In [None]:
body = {
    "size" : 0,
    "aggs" : {
        "avg_score" : {
            "avg" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

In [None]:
body = {
    "size" : 0,
    "aggs" : {
        "max_score" : {
            "max" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

In [None]:
body = {
    "size" : 0,
    "aggs" : {
        "min_score" : {
            "min" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

In [None]:
body = {
    "size" : 0,
    "aggs" : {
        "sum_score" : {
            "sum" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

In [None]:
body ={
    "size" : 0,
    "aggs" : {
        "stats_score" : {
            "stats" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

## Elasticsearch의 Buckey Aggregation 활용해보기

In [None]:
INDEX_NAME = "basketball"
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)

es.indices.create(index=INDEX_NAME)

In [None]:
#FieldData 가 True인 이유
#Term aggregation을 위해
body= {
    "properties" : {
        "team" : {
            "type" : "text",
            "fielddata" : True
        },
        "name" : {
            "type" : "text",
            "fielddata" : True
        },
        "points" : {
            "type" : "long"
        },
        "rebounds" : {
            "type" : "long"
        },
        "assists" : {
            "type" : "long"
        },
        "blocks" : {
            "type" : "long"
        },
        "submit_date" : {
            "type" : "date",
            "format" : "yyyy-MM-dd"
        }
    }
}

es.indices.put_mapping(index=INDEX_NAME,body=body)

In [None]:
try:
    response = helpers.bulk(es, bulk_json_data("data/twoteam_basketball.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)

In [None]:
body = {
    "size" : 0,
    "aggs" : {
        "players" : {
            "terms" : {
                "field" : "team"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

In [None]:
body = {
    "size" : 0,
    "aggs" : {
        "team_stats" : {
            "terms" : {
                "field" : "team"
            },
            "aggs" : {
                "stats_score" : {
                    "stats" : {
                        "field" : "points"
                    }
                }
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

## Kibana 활용해보기

In [None]:
INDEX_NAME = "basketball"
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)

es.indices.create(index=INDEX_NAME)

In [None]:
#FieldData 가 True인 이유
#Term aggregation을 위해
body={
    "properties" : {
        "team" : {
            "type" : "text",
            "fielddata" : True
        },
        "name" : {
            "type" : "text",
            "fielddata" : True
        },
        "points" : {
            "type" : "long"
        },
        "rebounds" : {
            "type" : "long"
        },
        "assists" : {
            "type" : "long"
        },
        "blocks" : {
            "type" : "long"
        },
        "submit_date" : {
            "type" : "date",
            "format" : "yyyy-MM-dd"
        }
    }
}


es.indices.put_mapping(index=INDEX_NAME,body=body)

In [None]:
try:
    response = helpers.bulk(es, bulk_json_data("data/bulk_basketball2.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)