In [None]:
!pip install pyspark==3.5.2

In [None]:
!pip show pyspark

In [None]:
import os
import logging
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from transformers import pipeline

# 로깅 설정 (에러 레벨에서만 로그 출력)
logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')

# 체크포인트 디렉터리 설정
checkpoint_dir = "/kaggle/working/checkpoints/kafka_to_mongo"
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)

# .env 파일 로드
load_dotenv()

# Kafka 및 MongoDB 설정
config = {
    "kafka": {
        "bootstrap.servers": os.getenv("BOOTSTRAP_SERVERS"),
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "PLAIN",
        "sasl.username": os.getenv("SASL_USERNAME"),
        "sasl.password": os.getenv("SASL_PASSWORD"),
        "client.id": "json-serial-producer"
    },
    "mongodb": {
        "uri": os.getenv("URI"),
        "database": os.getenv("DATABASE"),
        "collection": os.getenv("COLLECTION")
    }
}

# Kafka 토픽 설정
topic = "raw_topic"

# 감정 분석 모델 로드
sentiment_pipeline = pipeline("text-classification", model="distilbert-base-uncased-finetuned-sst-2-english")

def analyze_sentiment(text):
    """
    텍스트 데이터에 대한 감정 분석 수행
    :param text: 분석할 텍스트 (문자열)
    :return: 긍정(positive) 또는 부정(negative) 라벨 반환
    """
    if text and isinstance(text, str):
        try:
            result = sentiment_pipeline(text)[0]
            return result['label']
        except Exception as e:
            logging.error(f"Error in sentiment analysis: {e}")
            return "Error"
    return "Empty or Invalid"

# 감정 분석 UDF
sentiment_udf = udf(analyze_sentiment, StringType())

def read_from_kafka_and_write_to_mongo(spark):
    """
    Kafka에서 스트리밍 데이터를 읽고 감정 분석을 수행한 후 MongoDB에 저장
    :param spark: SparkSession 객체
    """
    schema = StructType([
        StructField("review_id", StringType()),
        StructField("user_id", StringType()),
        StructField("business_id", StringType()),
        StructField("stars", FloatType()),
        StructField("useful", IntegerType()),
        StructField("funny", IntegerType()),
        StructField("cool", IntegerType()),
        StructField("text", StringType()),
        StructField("date", StringType())
    ])
    
    # Kafka 스트리밍 데이터 읽기
    stream_df = (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", config['kafka']['bootstrap.servers'])
            .option("subscribe", topic)
            .option("kafka.security.protocol", config['kafka']['security.protocol'])
            .option("kafka.sasl.mechanism", config['kafka']['sasl.mechanisms'])
            .option(
                "kafka.sasl.jaas.config",
                f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{config["kafka"]["sasl.username"]}" '
                f'password="{config["kafka"]["sasl.password"]}";'
            )
            .option("failOnDataLoss", "false")
            .load()
    )
    
    # Kafka 메시지에서 JSON 데이터 변환
    parsed_df = stream_df.select(from_json(col('value').cast("string"), schema).alias("data")).select("data.*")
    
    # 감정 분석 컬럼 추가
    enriched_df = parsed_df.withColumn("sentiment", sentiment_udf(col('text')))
    
    # MongoDB로 스트리밍 데이터 저장
    query = (
        enriched_df.writeStream
            .format("mongodb")
            .option("spark.mongodb.connection.uri", config['mongodb']['uri'])
            .option("spark.mongodb.database", config['mongodb']['database'])
            .option("spark.mongodb.collection", config['mongodb']['collection'])
            .option("checkpointLocation", checkpoint_dir)
            .outputMode("append")
            .start()
            .awaitTermination()
    )

# 실행
if __name__ == "__main__":
    # Spark 세션 생성
    spark = (
        SparkSession.builder
            .appName("KafkaStreamToMongo")
            .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.mongodb.spark:mongo-spark-connector_2.12:10.4.0")
            .getOrCreate()
    )
    
    # Kafka에서 데이터를 읽고 MongoDB로 저장하는 함수
    read_from_kafka_and_write_to_mongo(spark)
