In [13]:
#-*- coding: utf-8 -*-
from elasticsearch import Elasticsearch,JSONSerializer, helpers
import numpy as np
import json

# https://github.com/elastic/elasticsearch-py/issues/378
class NumpyEncoder(JSONSerializer):
    """ Special json encoder for numpy types """
    def default(self, obj):
        if isinstance(obj, (np.int_, np.intc, np.intp, np.int8,
            np.int16, np.int32, np.int64, np.uint8,
            np.uint16, np.uint32, np.uint64)):
            return int(obj)
        elif isinstance(obj, (np.float_, np.float16, np.float32, 
            np.float64)):
            return float(obj)
        elif isinstance(obj,(np.ndarray,)): #### This is the fix
            return obj.tolist()
        return JSONSerializer.default(self, obj)
        
es = Elasticsearch(hosts="122.32.196.201:9200", port=9200, serializer=NumpyEncoder())  # 객체 생성

# Es Cluster Health Check
es.cluster.health() 

{'cluster_name': 'elasticsearch',
 'status': 'yellow',
 'timed_out': False,
 'number_of_nodes': 1,
 'number_of_data_nodes': 1,
 'active_primary_shards': 3,
 'active_shards': 3,
 'relocating_shards': 0,
 'initializing_shards': 0,
 'unassigned_shards': 3,
 'delayed_unassigned_shards': 0,
 'number_of_pending_tasks': 0,
 'number_of_in_flight_fetch': 0,
 'task_max_waiting_in_queue_millis': 0,
 'active_shards_percent_as_number': 50.0}

In [53]:
# encoding: utf8
from __future__ import print_function
import multiprocessing
from joblib import Parallel, delayed

import argparse
import tqdm
import os
import glob
import re
import pandas as pd
 
COLS = {}
COLS['apt-trade'] = ['si', 'gu', 'sigungu', 'legal_dong', 'apt_name', 'transaction_amount', 'transaction_date', 'description',
                     'transaction_year', 'transaction_month', 'floor', 'dedicated_area', 'year_of_construction']
COLS['apt-rent'] = ['si', 'gu', 'sigungu', 'legal_dong', 'apt_name', 'transaction_amount', 'transaction_date',
                    'transaction_year', 'transaction_month', 'floor', 'dedicated_area', 'monthly_rent', 'deposit']


def preprocessing(df: pd.DataFrame) -> pd.DataFrame:
    # preprocessing
    df['transaction_amount'] = df['transaction_amount'].astype(float)
    # df['transaction_amount'] = df['transaction_amount'].astype(
    # float).apply(lambda x: round(x / 10000, 2))
    # 2016-05-26 0:00:00 

    df['message'] = df['apt_name'].apply(lambda x: ' '.join(re.compile('[가-힣0-9]+').findall(x)))
    df['transaction_date'] = pd.to_datetime(
        df['transaction_date'], format="%Y-%m-%d %H:%M:%S") # .dt.date 
    df['transaction_year'] = df['transaction_year'].astype(object)
    df['transaction_month'] = df['transaction_month'].astype(object)
    df['year_of_construction'] = df['year_of_construction'].astype(int)
    df['floor'] = df['floor'].astype(int)
    df['dedicated_area'] = df['dedicated_area'].astype(float)

    if 'monthly_rent' in df.columns:
        df['monthly_rent'] = df['monthly_rent'].astype(int)
    return df

In [54]:
area_code_dirs = list(filter(lambda x: 'tar' not in x, glob.glob(os.path.join('../data_in','*'))))

In [55]:
import tqdm

trade_type = 'apt-trade'
for area_code_dir in area_code_dirs: #tqdm.tqdm(area_code_dirs): 
    area_code = area_code_dir.split('/')[-1]
    print(area_code_dir)
    filelist = glob.glob(os.path.join(area_code_dir, '*.csv'))
    frames = []
    print(filelist[:1])
    for filepath in tqdm.tqdm(filelist):
        frame = pd.read_csv(filepath, usecols=COLS[trade_type])
        frames.append(frame)
    df = pd.concat(frames, axis=0)
    df = preprocessing(df)

10%|▉         | 18/182 [00:00<00:00, 174.05it/s]../data_in/41135
['../data_in/41135/201604.csv']
100%|██████████| 182/182 [00:01<00:00, 181.27it/s]


In [56]:
df.columns

Index(['transaction_amount', 'year_of_construction', 'transaction_year',
       'legal_dong', 'apt_name', 'transaction_month', 'dedicated_area',
       'floor', 'si', 'gu', 'sigungu', 'transaction_date', 'description',
       'message'],
      dtype='object')

In [58]:
from elasticsearch import Elasticsearch,JSONSerializer, helpers

response = helpers.bulk(es, df.to_dict('record'), index='apt-trade') 

In [59]:
es.search(index='apt-trade', body={"query":{"match_all":{}}})

{'took': 7148,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 10000, 'relation': 'gte'},
  'max_score': 1.0,
  'hits': [{'_index': 'apt-trade',
    '_type': '_doc',
    '_id': 'WONDv3gBsTK-DR3xqJ9d',
    '_score': 1.0,
    '_source': {'transaction_amount': 52500.0,
     'year_of_construction': 1993,
     'transaction_year': 2016,
     'legal_dong': '분당동',
     'apt_name': '샛별마을(라이프)',
     'transaction_month': 4,
     'dedicated_area': 84.99,
     'floor': 18,
     'si': '경기도',
     'gu': '성남시 분당구',
     'sigungu': '경기도 성남시 분당구',
     'transaction_date': '2016-04-01T00:00:00',
     'description': '경기도 성남시 분당구 분당동 샛별마을 라이프',
     'message': '샛별마을 라이프'}},
   {'_index': 'apt-trade',
    '_type': '_doc',
    '_id': 'WeNDv3gBsTK-DR3xqJ9d',
    '_score': 1.0,
    '_source': {'transaction_amount': 46000.0,
     'year_of_construction': 1993,
     'transaction_year': 2016,
     'legal_dong': '분당동',
     'apt_name': '장안타운(건