# BIG DATA ANALYTICS PROGRAMMING : Elasticsearch, Kibana
엘라스틱서치와 키바나의 기본 원리와 사용법에 대해서 배웁니다.
____
- 본자료는 허민석님의 ELK 강의를 기반으로 Python Client와 Elasticsearch 7.10버전에 맞게 재구성한 내용입니다
- Sources:
    - https://github.com/minsuk-heo/BigData
    - https://www.youtube.com/watch?v=J2PIBQgEpC4&list=PLVNY1HnUlO24LCsgOxR_eK2Yi4sOgH9Pg&index=1
    - https://kb.objectrocket.com/elasticsearch/how-to-use-python-helpers-to-bulk-load-data-into-an-elasticsearch-index
    - https://docs.aws.amazon.com/ko_kr/elasticsearch-service/latest/developerguide/es-managedomains-logstash.html

## 1. 라이브러리 설치 및 Import

In [None]:
import sys
!{sys.executable} -m pip install elasticsearch



In [1]:
from elasticsearch import Elasticsearch, helpers
import pprint

## 2. Elasticsearch 연결

In [2]:

host = ""

try:
    es.transport.close()
except:
    pass
es = Elasticsearch([host],port=9201)



## 3. Classes라는 인덱스 생성

In [3]:
INDEX_NAME = "classes"
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)
es.indices.create(index=INDEX_NAME)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'classes'}

In [4]:
# 잘 생성이 되었는지 확인
result = es.indices.get(index=INDEX_NAME)
pprint.pprint(result)

{'classes': {'aliases': {},
             'mappings': {},
             'settings': {'index': {'creation_date': '1607619766242',
                                    'number_of_replicas': '1',
                                    'number_of_shards': '1',
                                    'provided_name': 'classes',
                                    'routing': {'allocation': {'include': {'_tier_preference': 'data_content'}}},
                                    'uuid': 'evXEMDEAQgCPLgl06qvJ-Q',
                                    'version': {'created': '7100199'}}}}}


## 4. Mapping 추가
- RDB로 치면 Schema

In [5]:
body= {
    "properties" : {
        "title" : {
            "type" : "text"
        },
        "professor" : {
            "type" : "text"
        },
        "major" : {
            "type" : "text"
        },
        "semester" : {
            "type" : "text"
        },
        "student_count" : {
            "type" : "integer"
        },
        "unit" : {
            "type" : "integer"
        },
        "rating" : {
            "type" : "integer"
        },
        "submit_date" : {
            "type" : "date",
            "format" : "yyyy-MM-dd"
        },
        "school_location" : {
            "type" : "geo_point"
        }
    }
}
es.indices.put_mapping(index=INDEX_NAME,body=body)

{'acknowledged': True}

In [6]:
#잘 추가되었는지 확인
result = es.indices.get(index=INDEX_NAME)
pprint.pprint(result)

{'classes': {'aliases': {},
             'mappings': {'properties': {'major': {'type': 'text'},
                                         'professor': {'type': 'text'},
                                         'rating': {'type': 'integer'},
                                         'school_location': {'type': 'geo_point'},
                                         'semester': {'type': 'text'},
                                         'student_count': {'type': 'integer'},
                                         'submit_date': {'format': 'yyyy-MM-dd',
                                                         'type': 'date'},
                                         'title': {'type': 'text'},
                                         'unit': {'type': 'integer'}}},
             'settings': {'index': {'creation_date': '1607619766242',
                                    'number_of_replicas': '1',
                                    'number_of_shards': '1',
                                    'p

### Bulk 인덱싱을 위한 함수들
- https://kb.objectrocket.com/elasticsearch/how-to-use-python-helpers-to-bulk-load-data-into-an-elasticsearch-index

In [7]:
import uuid
def get_data_from_file(file_name):
    if "/" in file_name or chr(92) in file_name:
        file = open(file_name, encoding="utf8", errors='ignore')
    else:
        # use the script_path() function to get path if none is passed
        file = open(script_path() + str(file_name), encoding="utf8", errors='ignore')
    data = [line.strip().replace(" ","") for line in file]
    file.close()
    return data

def bulk_json_data(json_file, _index):
    json_list = get_data_from_file(json_file)
    for doc in json_list:
        # use a `yield` generator so that the data
        # isn't loaded inbto memory
        if '{"index"' in doc:
            _id = json.loads(doc)["index"]["_id"]
        if '{"index"' not in doc:
            yield {
                "_index": _index,
                "_id": _id,
                "_source": doc
            }

## Classes 데이터 인덱싱

In [9]:
import json
try:
    response = helpers.bulk(es, bulk_json_data("data/classes.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)


RESPONSE: (24, [])


In [10]:
#잘 들어갔는지 확인!
doc = es.get(index=INDEX_NAME, id=1)
pprint.pprint(doc)

{'_id': '1',
 '_index': 'classes',
 '_primary_term': 1,
 '_seq_no': 0,
 '_source': {'Professor': 'MinsukHeo',
             'major': 'ComputerScience',
             'rating': 5,
             'school_location': {'lat': 36.0, 'lon': -120.0},
             'semester': ['spring', 'fall'],
             'student_count': 100,
             'submit_date': '2016-01-02',
             'title': 'MachineLearning',
             'unit': 3},
 '_type': '_doc',
 '_version': 1,
 'found': True}


## Elasticsearch의 Search 함수 활용해보기

In [11]:
INDEX_NAME = "basketball"
try:
    response = helpers.bulk(es, bulk_json_data("data/bulk_basketball.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)


RESPONSE: (16, [])


In [12]:
res = es.search(index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [{'_id': '1',
                    '_index': 'basketball',
                    '_score': 1.0,
                    '_source': {'assists': 4,
                                'blocks': 5,
                                'name': 'StephenCurry',
                                'points': 30,
                                'rebounds': 3,
                                'submit_date': '2016-10-11',
                                'team': 'GoldenStatesWarriors'},
                    '_type': '_doc'},
                   {'_id': '2',
                    '_index': 'basketball',
                    '_score': 1.0,
                    '_source': {'assists': 8,
                                'blocks': 5,
                                'name': 'StephenCurry',
                                'points': 32,
                                'rebounds': 5,
                                'submit_date': '2016-10-13',
   

In [13]:
body = {
    "query": {
        "term": {
            "points":30
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [{'_id': '1',
                    '_index': 'basketball',
                    '_score': 1.0,
                    '_source': {'assists': 4,
                                'blocks': 5,
                                'name': 'StephenCurry',
                                'points': 30,
                                'rebounds': 3,
                                'submit_date': '2016-10-11',
                                'team': 'GoldenStatesWarriors'},
                    '_type': '_doc'}],
          'max_score': 1.0,
          'total': {'relation': 'eq', 'value': 1}},
 'timed_out': False,
 'took': 1}


## Elasticsearch의 Aggregation 활용해보기

In [14]:
INDEX_NAME = "basketball"
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)
try:
    response = helpers.bulk(es, bulk_json_data("data/simple_basketball.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)


RESPONSE: (2, [])


In [15]:
body = {
    "size" : 0,
    "aggs" : {
        "avg_score" : {
            "avg" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'aggregations': {'avg_score': {'value': 25.0}},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 2}},
 'timed_out': False,
 'took': 2}


In [16]:
body = {
    "size" : 0,
    "aggs" : {
        "max_score" : {
            "max" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'aggregations': {'max_score': {'value': 30.0}},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 2}},
 'timed_out': False,
 'took': 1}


In [17]:
body = {
    "size" : 0,
    "aggs" : {
        "min_score" : {
            "min" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'aggregations': {'min_score': {'value': 20.0}},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 2}},
 'timed_out': False,
 'took': 1}


In [18]:
body = {
    "size" : 0,
    "aggs" : {
        "sum_score" : {
            "sum" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'aggregations': {'sum_score': {'value': 50.0}},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 2}},
 'timed_out': False,
 'took': 1}


In [19]:
body ={
    "size" : 0,
    "aggs" : {
        "stats_score" : {
            "stats" : {
                "field" : "points"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'aggregations': {'stats_score': {'avg': 25.0,
                                  'count': 2,
                                  'max': 30.0,
                                  'min': 20.0,
                                  'sum': 50.0}},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 2}},
 'timed_out': False,
 'took': 1}


## Elasticsearch의 Bucket Aggregation 활용해보기

In [21]:
INDEX_NAME = "basketball"
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)

es.indices.create(index=INDEX_NAME)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'basketball'}

In [22]:
#FieldData 가 True인 이유
#Term aggregation을 위해
body= {
    "properties" : {
        "team" : {
            "type" : "text",
            "fielddata" : True
        },
        "name" : {
            "type" : "text",
            "fielddata" : True
        },
        "points" : {
            "type" : "long"
        },
        "rebounds" : {
            "type" : "long"
        },
        "assists" : {
            "type" : "long"
        },
        "blocks" : {
            "type" : "long"
        },
        "submit_date" : {
            "type" : "date",
            "format" : "yyyy-MM-dd"
        }
    }
}

es.indices.put_mapping(index=INDEX_NAME,body=body)

{'acknowledged': True}

In [23]:
try:
    response = helpers.bulk(es, bulk_json_data("data/twoteam_basketball.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)


RESPONSE: (4, [])


In [24]:
body = {
    "size" : 0,
    "aggs" : {
        "players" : {
            "terms" : {
                "field" : "team"
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'aggregations': {'players': {'buckets': [{'doc_count': 2, 'key': 'chicago'},
                                          {'doc_count': 2, 'key': 'la'}],
                              'doc_count_error_upper_bound': 0,
                              'sum_other_doc_count': 0}},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 4}},
 'timed_out': False,
 'took': 8}


In [25]:
body = {
    "size" : 0,
    "aggs" : {
        "team_stats" : {
            "terms" : {
                "field" : "team"
            },
            "aggs" : {
                "stats_score" : {
                    "stats" : {
                        "field" : "points"
                    }
                }
            }
        }
    }
}
res = es.search(body=body,index=INDEX_NAME)
pprint.pprint(res)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'aggregations': {'team_stats': {'buckets': [{'doc_count': 2,
                                              'key': 'chicago',
                                              'stats_score': {'avg': 25.0,
                                                              'count': 2,
                                                              'max': 30.0,
                                                              'min': 20.0,
                                                              'sum': 50.0}},
                                             {'doc_count': 2,
                                              'key': 'la',
                                              'stats_score': {'avg': 35.0,
                                                              'count': 2,
                                                              'max': 40.0,
                                                              'min': 30.0,
        

## Kibana 활용해보기

In [26]:
INDEX_NAME = "basketball"
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)

es.indices.create(index=INDEX_NAME)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'basketball'}

In [27]:
#FieldData 가 True인 이유
#Term aggregation을 위해
body={
    "properties" : {
        "team" : {
            "type" : "text",
            "fielddata" : True
        },
        "name" : {
            "type" : "text",
            "fielddata" : True
        },
        "points" : {
            "type" : "long"
        },
        "rebounds" : {
            "type" : "long"
        },
        "assists" : {
            "type" : "long"
        },
        "blocks" : {
            "type" : "long"
        },
        "submit_date" : {
            "type" : "date",
            "format" : "yyyy-MM-dd"
        }
    }
}


es.indices.put_mapping(index=INDEX_NAME,body=body)

{'acknowledged': True}

In [28]:
try:
    response = helpers.bulk(es, bulk_json_data("data/bulk_basketball2.json", INDEX_NAME))
    print ("\nRESPONSE:", response)
except Exception as e:
    print("\nERROR:", e)


RESPONSE: (16, [])
