In [1]:
import pandas as pd
rating_df = pd.read_csv("../data/test_rating.csv")

In [2]:
# 고유한 user_idx 목록 추출
unique_users = set(rating_df['user_idx'])

# 상위 2만 개 사용자 선택
selected_users = list(unique_users)[:10000]

# 선택된 사용자에 해당하는 데이터프레임 슬라이스
sliced_df = rating_df[rating_df['user_idx'].isin(selected_users)]

# 결과 확인
print(f"원래 사용자 수: {len(unique_users)}")
print(f"슬라이스된 사용자 수: {len(set(sliced_df['user_idx']))}")
print(f"슬라이스된 데이터프레임 크기: {sliced_df.shape}")
sliced_df.head()

원래 사용자 수: 28921
슬라이스된 사용자 수: 10000
슬라이스된 데이터프레임 크기: (34284, 4)


Unnamed: 0,review_idx,user_idx,product_idx,review_rating
2,3,861,1690,4
3,4,5391,3953,5
9,10,6266,1345,4
12,13,4427,61,2
19,20,8323,769,5


In [3]:
user_item_matrix = sliced_df.pivot_table(
        index='user_idx',
        columns='product_idx',
        values='review_rating',
        fill_value=0
)

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import StringType, ArrayType, FloatType, StructType, StructField
from scipy.sparse import csr_matrix
from sklearn.decomposition import TruncatedSVD
from elasticsearch import Elasticsearch
from datetime import datetime
import pandas as pd
import time
import logging
import json

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("elasticsearch")

In [5]:
from pyspark.sql import SparkSession
# spark = SparkSession.builder \
#     .master("spark://spark-master:7077") \
#     .appName("JupyterSparkApp") \
#     .config("spark.jars", "/usr/local/spark/jars/elasticsearch-spark-30_2.12-8.11.0.jar") \
#     .getOrCreate()
# 새 SparkSession 초기화
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("Write to Elasticsearch") \
    .config("spark.jars", "/usr/local/spark/jars/elasticsearch-spark-30_2.12-8.11.0.jar") \
    .config("spark.driver.extraClassPath", "/usr/local/spark/jars/elasticsearch-spark-30_2.12-8.11.0.jar") \
    .config("spark.executor.extraClassPath", "/usr/local/spark/jars/elasticsearch-spark-30_2.12-8.11.0.jar") \
    .getOrCreate()

In [6]:
user_idx_list = user_item_matrix.index
item_idx_list = user_item_matrix.columns

user_matrix_sparse = csr_matrix(user_item_matrix.values)
item_matrix_sparse = csr_matrix(user_item_matrix.T.values)

n_components = 128  # 차원 수 (하이퍼파라미터, 튜닝 가능)
user_svd = TruncatedSVD(n_components=n_components, random_state=42)
item_svd = TruncatedSVD(n_components=n_components, random_state=42)
User_svd = user_svd.fit_transform(user_matrix_sparse)
Item_svd = item_svd.fit_transform(item_matrix_sparse)

In [7]:
def convert_to_spark_df(spark, pandas_df, entity_type):
    """Pandas DataFrame을 Spark DataFrame으로 변환"""
    # 벡터 컬럼을 문자열로 변환 (JSON 형식)
    pandas_df['vector_json'] = pandas_df['vector'].apply(lambda x: json.dumps(x.tolist()))

    # Spark DataFrame 스키마 정의
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("vector_json", StringType(), False),
        StructField("last_updated", StringType(), False)
    ])

    # Spark DataFrame 생성
    spark_df = spark.createDataFrame(
        pandas_df[['id', 'vector_json', 'last_updated']],
        schema=schema
    )

    # 벡터 문자열을 배열로 변환하는 UDF 정의
    vector_to_array_udf = udf(lambda x: json.loads(x), ArrayType(FloatType()))

    # UDF 적용하여 벡터 변환
    result_df = spark_df.withColumn("vector", vector_to_array_udf(col("vector_json"))) \
        .drop("vector_json")

    return result_df


def es_write_with_spark(df, index_name):
    """Spark DataFrame을 Elasticsearch에 저장"""
    print(f"Writing records to Elasticsearch index '{index_name}'")

    start_time = time.time()
    # Elasticsearch에 데이터 쓰기 (Spark Elasticsearch connector 사용)
    df.write \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes", "elasticsearch") \
        .option("es.port", "9200") \
        .option("es.net.http.header.Accept", "application/vnd.elasticsearch+json; compatible-with=8") \
        .option("es.net.http.header.Content-Type", "application/json") \
        .option("es.mapping.id", "id") \
        .option("es.write.operation", "upsert") \
        .option("es.batch.size.entries", "1000") \
        .option("es.batch.write.retry.count", "3") \
        .option("es.batch.write.retry.wait", "10s") \
        .option("es.nodes.wan.only", "true") \
        .mode("append") \
        .save(index_name)

    print(f"Finished writing to Elasticsearch index '{index_name}'")

    end_time = time.time()
    logger.info(f"write_with_spark in {end_time - start_time:.2f} seconds")


def create_vector_indices(es, n_components):
    """Elasticsearch 벡터 인덱스 생성"""
    # 벡터 인덱스 매핑 설정
    vector_mapping = {
        "properties": {
            "vector": {
                "type": "dense_vector",
                "dims": n_components,
                "index": True,
                "similarity": "cosine",
                "index_options": {
                    "type": "hnsw",  # HNSW 알고리즘 사용
                    "m": 16,  # 최대 연결 수 (기본값)
                    "ef_construction": 200  # 인덱스 생성 시 효율성 파라미터
                }
            },
            "last_updated": {
                "type": "date"
            }
        }
    }

    # 인덱스 설정
    vector_settings = {
        "index": {
            "number_of_shards": 3,
            "number_of_replicas": 1
        }
    }

    # 인덱스 목록
    indices = ["user-based", "item-based"]

    for index_name in indices:
        # 인덱스가 존재하면 삭제 후 다시 생성
        if es.indices.exists(index=index_name):
            print(f"Deleting existing index: {index_name}")
            es.indices.delete(index=index_name)

        # 인덱스 생성
        print(f"Creating index: {index_name}")
        es.indices.create(
            index=index_name,
            mappings=vector_mapping,
            settings=vector_settings
        )
        print(f"Finished Creating index: {index_name}")

In [8]:
# 엘라스틱서치 연결
es = Elasticsearch(
    ["http://elasticsearch:9200"],
    headers={
        "Accept": "application/vnd.elasticsearch+json; compatible-with=8",
        "Content-Type": "application/json"
    }
)

In [9]:
# 인덱스 생성
create_vector_indices(es, n_components)

INFO:elastic_transport.transport:HEAD http://elasticsearch:9200/user-based [status:200 duration:0.342s]


Deleting existing index: user-based


INFO:elastic_transport.transport:DELETE http://elasticsearch:9200/user-based [status:200 duration:0.920s]


Creating index: user-based


INFO:elastic_transport.transport:PUT http://elasticsearch:9200/user-based [status:200 duration:1.492s]
INFO:elastic_transport.transport:HEAD http://elasticsearch:9200/item-based [status:200 duration:0.079s]


Finished Creating index: user-based
Deleting existing index: item-based


INFO:elastic_transport.transport:DELETE http://elasticsearch:9200/item-based [status:200 duration:0.228s]


Creating index: item-based


INFO:elastic_transport.transport:PUT http://elasticsearch:9200/item-based [status:200 duration:0.968s]


Finished Creating index: item-based


In [10]:
start_time = time.time()

# 현재 시간
current_time = datetime.now().isoformat()

# Pandas DataFrame 생성 (User 벡터)
user_df = pd.DataFrame({
    'id': user_idx_list,
    'vector': list(User_svd),
    'last_updated': current_time
})

# Pandas DataFrame 생성 (Item 벡터)
item_df = pd.DataFrame({
    'id': item_idx_list,
    'vector': list(Item_svd),
    'last_updated': current_time
})

# Spark DataFrame으로 변환
user_spark_df = convert_to_spark_df(spark, user_df, "user")
item_spark_df = convert_to_spark_df(spark, item_df, "item")

# Elasticsearch에 저장
es_write_with_spark(user_spark_df, "user-based")
es_write_with_spark(item_spark_df, "item-based")

end_time = time.time()
logger.info(f"Spark finished time in {end_time - start_time:.2f} seconds")
print("Vector update completed with PySpark")

# Spark 세션 종료
spark.stop()

Writing records to Elasticsearch index 'user-based'


INFO:elasticsearch:write_with_spark in 54.21 seconds


Finished writing to Elasticsearch index 'user-based'
Writing records to Elasticsearch index 'item-based'


INFO:elasticsearch:write_with_spark in 10.63 seconds
INFO:elasticsearch:Spark finished time in 70.93 seconds


Finished writing to Elasticsearch index 'item-based'
Vector update completed with PySpark


Writing records to Elasticsearch index 'user-based'


INFO:elasticsearch:write_with_spark in 56.12 seconds


Finished writing to Elasticsearch index 'user-based'
Writing records to Elasticsearch index 'item-based'


INFO:elasticsearch:write_with_spark in 14.20 seconds
INFO:elasticsearch:Spark finished time in 84.98 seconds


Finished writing to Elasticsearch index 'item-based'
Vector update completed with PySpark


In [11]:
# 1. 인덱스 리스트 조회
print("=== 인덱스 리스트 ===")
indices = es.cat.indices(format="json")
for index in indices:
    index_name = index['index']
    doc_count = index['docs.count']
    print(f"인덱스: {index_name}, 문서 수: {doc_count}")


INFO:elastic_transport.transport:GET http://elasticsearch:9200/_cat/indices?format=json [status:200 duration:0.167s]


=== 인덱스 리스트 ===
인덱스: user-based, 문서 수: 10000
인덱스: employees, 문서 수: 1
인덱스: item-based, 문서 수: 4186


In [13]:
user_spark_df.show(10)

Py4JJavaError: An error occurred while calling o70.showString.
: java.lang.IllegalStateException: SparkContext has been shutdown
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2390)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [15]:
spark.stop()