In [15]:
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError

In [16]:
messages_demo_path = r"/home/user1/project/shop_Modeling/data/e-commerce_data/messages-demo.csv"
chunk_size = 5000
num_rows_to_read = 1000000

In [17]:
# 청크 단위로 데이터프레임을 읽어오기
chunks = pd.read_csv(messages_demo_path, chunksize=chunk_size)

# 청크를 담을 리스트 초기화
all_chunks = []

In [18]:
# 각 청크에 대한 처리
for chunk in chunks:
    # 청크를 리스트에 추가
    all_chunks.append(chunk)

    # 읽은 행의 총 수 계산
    rows_read = sum(chunk.shape[0] for chunk in all_chunks)

    # 읽은 행의 총 수가 목표 행의 수에 도달하면 중단
    if rows_read >= num_rows_to_read:
        break

# 모든 청크를 하나의 데이터프레임으로 통합
data = pd.concat(all_chunks, ignore_index=True)

In [19]:
# 데이터프레임의 크기 확인
print("데이터프레임의 크기:", data.shape)

데이터프레임의 크기: (1000000, 32)


In [20]:
data.head()

Unnamed: 0,id,message_id,campaign_id,message_type,client_id,channel,category,platform,email_provider,stream,...,is_soft_bounced,soft_bounced_at,is_complained,complained_at,is_blocked,blocked_at,is_purchased,purchased_at,created_at,updated_at
0,3527358,3f6aaad3-bab7-4886-b083-fe8c1f210066,31,transactional,1515915625489833514,email,,,mail.ru,desktop,...,f,,f,,f,,t,2021-05-06 16:40:38,2023-04-27 08:55:05.883908,2023-04-27 08:57:33.080129
1,3527619,0e670ecc-4549-44f6-86ed-469682d34837,32,transactional,1515915625489220445,email,,,yandex.ru,desktop,...,f,,f,,f,,f,,2023-04-27 08:55:06.265821,2023-04-27 08:56:18.60223
2,3527980,276b25cf-1bda-4faf-b3a4-98e4161f9357,32,transactional,1515915625489854185,email,,,mail.ru,desktop,...,f,,f,,f,,f,,2023-04-27 08:55:06.777039,2023-04-27 08:56:19.112546
3,3528369,4545aff2-09b3-45e3-9abd-c680357e5429,32,transactional,1515915625489101550,email,,,mail.ru,desktop,...,f,,f,,f,,f,,2023-04-27 08:55:07.325906,2023-04-27 08:56:19.590637
4,3528648,5850858d-2dcf-4f31-a0d3-5db5649b17c4,32,transactional,1515915625490455948,email,,,mail.ru,desktop,...,f,,f,,f,,f,,2023-04-27 08:55:07.727792,2023-04-27 08:56:19.926474


In [21]:
data_colums = data.dtypes
print(data_colums)

id                         int64
message_id                object
campaign_id                int64
message_type              object
client_id                  int64
channel                   object
category                 float64
platform                  object
email_provider            object
stream                    object
date                      object
sent_at                   object
is_opened                 object
opened_first_time_at      object
opened_last_time_at       object
is_clicked                object
clicked_first_time_at     object
clicked_last_time_at      object
is_unsubscribed           object
unsubscribed_at           object
is_hard_bounced           object
hard_bounced_at           object
is_soft_bounced           object
soft_bounced_at           object
is_complained             object
complained_at             object
is_blocked                object
blocked_at               float64
is_purchased              object
purchased_at              object
created_at

In [22]:
# # Elasticsearch 서버 연결
es = Elasticsearch([{'host': 'localhost', 'port': 9200, 'scheme': 'http'}])

In [23]:
# Elasticsearch 서버에 ping 요청 보내기
if es.ping():
    print("Elasticsearch 서버에 성공적으로 연결되었습니다.")
else:
    print("Elasticsearch 서버에 연결하지 못했습니다.")

Elasticsearch 서버에 성공적으로 연결되었습니다.


In [24]:
# 인덱스가 이미 존재하는 경우 삭제
if es.indices.exists(index="e_messages_demo"):
    es.indices.delete(index="e_messages_demo")

In [25]:
# Elasticsearch 인덱스 매핑 설정
e_messages_demo = 'e_messages_demo'  # 원하는 색인명으로 변경

# 인덱스 매핑 설정
mapping = {
    "mappings": {
        "properties": {
            "id": {"type": "integer"},
            "message_id": {"type": "keyword"},
            "campaign_id": {"type": "integer"},
            "message_type": {"type": "keyword"},
            "client_id": {"type": "integer"},
            "channel": {"type": "keyword"},
            "category": {"type": "float"},
            "platform": {"type": "keyword"},
            "email_provider": {"type": "keyword"},
            "stream": {"type": "keyword"},
            "date": {"type": "date", "format": "yyyy-MM-dd"},
            "sent_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_opened": {"type": "boolean"},
            "opened_first_time_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "opened_last_time_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_clicked": {"type": "boolean"},
            "clicked_first_time_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "clicked_last_time_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_unsubscribed": {"type": "boolean"},
            "unsubscribed_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_hard_bounced": {"type": "boolean"},
            "hard_bounced_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_soft_bounced": {"type": "boolean"},
            "soft_bounced_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_complained": {"type": "boolean"},
            "complained_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_blocked": {"type": "boolean"},
            "blocked_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_purchased": {"type": "boolean"},
            "purchased_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "created_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "updated_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
        }
    }
}

In [26]:
# 인덱스 생성
es.indices.create(index="e_messages_demo", body=mapping, ignore=400)

  es.indices.create(index="e_messages_demo", body=mapping, ignore=400)


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'e_messages_demo'})

In [27]:
# Elasticsearch에 이미 색인된 데이터 삭제
es.indices.delete(index="e_messages_demo", ignore=[400, 404])

  es.indices.delete(index="e_messages_demo", ignore=[400, 404])


ObjectApiResponse({'acknowledged': True})

In [28]:
# documents 리스트 초기화
documents = []

# 각 청크에 대한 처리
for chunk in chunks:
    for _, row in chunk.iterrows():
        document = row.to_dict()

        # NaN이 아닌 경우에만 날짜를 ISO 형식으로 변환
        for key, value in document.items():
            if pd.notna(value) and key.endswith('_at'):
                document[key] = pd.to_datetime(value).isoformat()

        # NaN을 None으로 대체
        document = {key: (None if pd.isna(value) else value) for key, value in document.items()}

        documents.append({
            "_op_type": "index",
            "_index": e_messages_demo,
            "_source": document
        })

    # Elasticsearch에 색인
    try:
        success, failed = bulk(es, documents)
        print(f"Indexed {success} documents successfully.")
        if failed:
            for i, failure in enumerate(failed):
                print(f"Failed to index document {i + 1}: {failure}")
    except BulkIndexError as e:
        print(f"Error indexing documents: {e}")

print("색인이 완료되었습니다.")

Indexed 5000 documents successfully.
Indexed 10000 documents successfully.
Indexed 15000 documents successfully.
Indexed 20000 documents successfully.
Indexed 25000 documents successfully.
Indexed 30000 documents successfully.
Indexed 35000 documents successfully.
Indexed 40000 documents successfully.
Indexed 45000 documents successfully.
Indexed 50000 documents successfully.
Indexed 55000 documents successfully.
Indexed 60000 documents successfully.
Indexed 65000 documents successfully.
Indexed 70000 documents successfully.
Indexed 75000 documents successfully.
Indexed 80000 documents successfully.
Indexed 85000 documents successfully.
Indexed 90000 documents successfully.
Indexed 95000 documents successfully.
Indexed 100000 documents successfully.
Indexed 105000 documents successfully.


KeyboardInterrupt: 