In [1]:
from pyspark.sql import SparkSession

# 드라이버 4040로 접속
spark = SparkSession.builder \
    .appName("JupyterSparkSession2") \
    .master("spark://192.168.0.63:7077") \
    .getOrCreate()
    # .config("spark.driver.memory", "512m") \
    # .config("spark.executor.memory", "512m") \
    # .config("spark.executor.cores", "1") \
    # .config("spark.python.worker.memory", "512m") \
    # .config("spark.driver.maxResultSize", "512m") \

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

def write_batch_to_postgres(df, epoch_id):
    # 현재 타임스탬프로 created_at 컬럼 추가
    df_with_created_at = df.withColumn("created_at", current_timestamp())
    
    # 데이터프레임을 PostgreSQL에 쓰기
    df_with_created_at.write \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://{postgres_host}:{postgres_port}/{postgres_db}") \
        .option("dbtable", "click_events_results") \
        .option("user", postgres_user) \
        .option("password", postgres_password) \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

# Spark 세션 생성
spark = SparkSession.builder \
    .appName("ClickEventProcessing") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.postgresql:postgresql:42.2.18") \
    .config("spark.driver.extraClassPath", "/opt/bitnami/spark/jars/*") \
    .config("spark.executor.extraClassPath", "/opt/bitnami/spark/jars/*") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
    .getOrCreate()

# Kafka 및 PostgreSQL 설정
kafka_broker = "kafka-broker-local:29092"
postgres_host = "postgres"
postgres_port = "5432"
postgres_db = "postgres"
postgres_user = "username"
postgres_password = "password"

# 클릭 이벤트 스키마 정의
click_event_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("ad_id", StringType(), True),
    StructField("timestamp", StringType(), True)
])

# Kafka에서 클릭 이벤트 스트림 읽기
click_events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", "click_events") \
    .load() \
    .select(from_json(col("value").cast("string"), click_event_schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", to_timestamp("timestamp"))

# PostgreSQL에서 사용자 정보 읽기
user_info = spark.read.format("jdbc") \
    .option("url", f"jdbc:postgresql://{postgres_host}:{postgres_port}/{postgres_db}") \
    .option("dbtable", "users") \
    .option("user", postgres_user) \
    .option("password", postgres_password) \
    .option("driver", "org.postgresql.Driver") \
    .load()

# 클릭 이벤트와 사용자 정보 조인
joined_data = click_events.join(user_info, "user_id")

# 스키마 확인을 위한 출력
print("Joined Data Schema:")
joined_data.printSchema()

# 결과를 PostgreSQL에 저장
query = joined_data \
    .writeStream \
    .foreachBatch(write_batch_to_postgres) \
    .outputMode("update") \
    .start()

query.awaitTermination()

Joined Data Schema:
root
 |-- user_id: string (nullable = true)
 |-- ad_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- created_at: timestamp (nullable = true)

