In [1]:
# ┌── 셀 1: SparkSession 생성 (한 번만 실행)
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("MongoDB-ES-Demo")
         .master("local[1]")
         .config("spark.ui.enabled", "false")
         .getOrCreate())
spark.sparkContext.setLogLevel("WARN")

In [2]:
# ┌── 셀 2: PyMongo 로 MongoDB에서 문서 읽기
from pymongo import MongoClient
import json
import os

# 1) MongoDB 에서 데이터 읽기
client = MongoClient("mongodb://127.0.0.1:27017", serverSelectionTimeoutMS=2000)
docs = list(client.demo.users.find())
for d in docs: d.pop("_id", None)

# 2) JSON Lines 파일로 저장 (한 줄에 한 문서씩)
os.makedirs("data", exist_ok=True)
with open("data/users.jsonl", "w", encoding="utf-8") as f:
    for d in docs:
        f.write(json.dumps(d, ensure_ascii=False) + "\n")

In [3]:
# ┌── 셀 3: spark.read.json 으로 파일에서 바로 DataFrame 생성
mongo_df = spark.read.json("data/users.jsonl")
mongo_df.show(truncate=False)

+---+-----+-------+
|age|name |user_id|
+---+-----+-------+
|30 |Alice|U1     |
|25 |Bob  |U2     |
|28 |Carol|U3     |
+---+-----+-------+



In [9]:
# ┌── 셀 4: elasticsearch-py 로 ES에 색인 & 재조회
from elasticsearch import Elasticsearch, helpers

# 1) bulk 색인
es = Elasticsearch("http://127.0.0.1:9200")
actions = [
    {"_index": "users_index", "_id": d["user_id"], "_source": d}
    for d in docs
]
helpers.bulk(es, actions)

# 2) 색인 확인
res = es.search(
    index="users_index",
    query={    "range": { "age": { "gte": 29, "lte": 30 } } }
)
print([hit["_source"] for hit in res["hits"]["hits"]])

[{'user_id': 'U1', 'name': 'Alice', 'age': 30}]


In [5]:
# ┌── 셀 5: 세션 종료
spark.stop()