In [None]:
from pyspark.sql import SparkSession
import os
from datetime import datetime
from datetime import timedelta

spark = SparkSession.builder \
    .appName("LocalPySparkExample") \
    .master("local[*]") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.port", "4040") \
    .getOrCreate()

# 나머지 코드
sc = spark.sparkContext


In [None]:
# spark configuration 확인
print(spark.sparkContext.getConf().getAll())

In [None]:
# PATH
POST_PATH = "/Users/wh/Desktop/spark/HMG_W2/missions/W5/M2/docker/start_script/df_model/resources/naver_cafe_posts.csv"
COMMENT_PATH = "/Users/wh/Desktop/spark/HMG_W2/missions/W5/M2/docker/start_script/df_model/resources/naver_cafe_comments.csv"
MODEL_PATH = "/Users/wh/Desktop/spark/HMG_W2/missions/W5/M2/docker/start_script/df_model/resources/model_result.csv"
OUTPUT_PATH = "output"

# 현재 시각, 초 단위까지 저장
COLLECTED_AT = datetime.now().strftime("%Y-%m-%d %H:%M:%S")


In [None]:
# POST 데이터를 rdd로 읽어오기
post_rdd = spark.read.csv(POST_PATH, header=True, inferSchema=True).rdd

In [None]:
comment_rdd = spark.read.csv(COMMENT_PATH, header=True, inferSchema=True).rdd

In [None]:
model_rdd = sc.textFile(MODEL_PATH)
model_df = spark.read.csv(MODEL_PATH, header=True, inferSchema=True)


# 첫 번째 줄(헤더) 제거
header = model_rdd.first()
model_rdd = model_rdd.filter(lambda line: line != header)

# 각 줄을 쉼표로 분할하여 열로 변환
model_rdd = model_rdd.map(lambda line: line.split(","))

# 'post_type'이 1인 데이터만 필터링
# 주어진 데이터 구조에서 'post_type'은 인덱스 4에 위치해 있습니다.
model_rdd_t = model_rdd.filter(lambda x: int(x[4]) == 1)
model_rdd_f = model_rdd.filter(lambda x: int(x[4]) == 0)

model_df_t = model_df.filter(model_df['post_type'] == 1)
model_df_f = model_df.filter(model_df['post_type'] == 0)


In [None]:
model_df_t_broadcast = sc.broadcast(model_df_t.collect())
model_df_f_broadcast = sc.broadcast(model_df_f.collect())
model_rdd_t_broadcast = sc.broadcast(model_rdd_t.collect())
model_rdd_f_broadcast = sc.broadcast(model_rdd_f.collect())

In [None]:
# post_rdd# RDD로부터 (post_id, post_data)와 (post_id, comment_data)의 키-값 쌍 생성
post_pair_rdd = post_rdd.map(lambda row: (row['post_id'], row))
comment_pair_rdd = comment_rdd.map(lambda row: (row['post_id'], row))

# Left Join 수행
joined_rdd = post_pair_rdd.leftOuterJoin(comment_pair_rdd)

# 댓글 개수를 세기 위해 post_id를 기준으로 그룹화
comment_count_rdd = joined_rdd.mapValues(lambda x: 1 if x[1] else 0) \
                              .reduceByKey(lambda x, y: x + y)


In [None]:
# 댓글 개수를 세고, 각 post 데이터에 comments 속성을 추가
result_rdd = joined_rdd.mapValues(lambda x: (x[0], 1 if x[1] else 0)) \
                       .reduceByKey(lambda x, y: (x[0], x[1] + y[1])) \
                       .map(lambda x: (x[1][0].post_id, x[1][0].title, x[1][0].content, x[1][0].likes, x[1][0].url, x[1][0].author, x[1][0].views, x[1][0].created_at, x[1][0].updated_at, x[1][1]))


In [None]:
def add_time(x):
    created_at = x[7]

    # 문자열 형태의 created_at 을 시간으로 변환
    try:
        created_at = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S")
    except:
        return (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], None)

    # 2시간을 더하기
    created_at = created_at + timedelta(hours=2)
    created_at = created_at.strftime("%Y-%m-%d %H:%M:%S")

    return (x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], created_at)

# result_rdd에서 created_at에 2시간 더한 컬럼을 추가, rdd 사용
result_rdd = result_rdd.map(lambda x: add_time(x))

In [None]:
# result_rdd에서 x[10] nan인 행을 제거
result_rdd = result_rdd.filter(lambda x: x[10] is not None)

In [None]:
def find_impact_rdd(relative_time, cumulative_num):
    # 브로드캐스트 변수에서 값 가져오기
    model_rdd_t = model_rdd_t_broadcast.value
    model_rdd_f = model_rdd_f_broadcast.value

    # ['416', '16', '0.007359045763059523', '16', '1', '5']
    # model_rdd_t에서 특정 조건으로 필터링 (cumulative_num과 relative_time 조건으로)
    filtered_t = [row for row in model_rdd_t if float(row[3]) == cumulative_num and int(row[5]) == relative_time]

    # model_rdd_f에서 특정 조건으로 필터링 (cumulative_num과 relative_time 조건으로)
    filtered_f = [row for row in model_rdd_f if float(row[3]) == cumulative_num and int(row[5]) == relative_time]

    value_t = float(filtered_t[0][2])
    value_f = float(filtered_f[0][2])

    if value_t == None or value_f == None:
        impact = 0
    else:
        if value_t != 0:
            impact = value_t / value_f
        else:
            impact = value_t / 7.832224256932018e-62

    # 결과 출력
    return impact


def find_impact(relative_time, cumulative_num): 
    # 브로드캐스트 변수에서 값 가져오기
    model_df_t = model_df_t_broadcast.value
    model_df_f = model_df_f_broadcast.value


    # 필요한 데이터만 추출 (필터링은 리스트 컴프리헨션을 사용하여 수행)
    value_t = [row['pdf'] for row in model_df_t if row['cumulative_num'] == cumulative_num and row['relative_time'] == relative_time]
    value_f = [row['pdf'] for row in model_df_f if row['cumulative_num'] == cumulative_num and row['relative_time'] == relative_time]

    # 위 결과는 유이한 행이 하나만 나오므로 첫 번째 행만 추출
    if value_f[0] != 0:
        impact = value_t[0] / value_f[0]
    else:
        impact = value_t[0] / 7.832224256932018e-62
    
    return impact


def apply_model_a(x):
    post_id = x[0]
    title = x[1]
    content = x[2]
    likes = x[3]
    url = x[4]
    author = x[5]
    views = x[6]
    created_at = x[7]
    updated_at = x[8]
    comment_count = x[9]
    collected_at = x[10]

    collected_at = datetime.strptime(collected_at, "%Y-%m-%d %H:%M:%S")
    created_at = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S")

    # 두 시간 차이 분단위로 계산
    time_diff = (collected_at - created_at).seconds // 60
    time_diff = time_diff - (time_diff % 5)

    value = find_impact(time_diff, comment_count)
    # value = find_pdf_rdd(time_diff, comment_count)

    # 모델 적용
    # 여기서는 간단히 created_at_plus_2h 컬럼을 추가
    return (post_id, title, content, likes, url, author, views, created_at, updated_at, comment_count, collected_at, value)

def apply_model_b(x):
    post_id = x[0]
    title = x[1]
    content = x[2]
    likes = x[3]
    url = x[4]
    author = x[5]
    views = x[6]
    created_at = x[7]
    updated_at = x[8]
    comment_count = x[9]
    collected_at = x[10]

    collected_at = datetime.strptime(collected_at, "%Y-%m-%d %H:%M:%S")
    created_at = datetime.strptime(created_at, "%Y-%m-%d %H:%M:%S")

    # 두 시간 차이 분단위로 계산
    time_diff = (collected_at - created_at).seconds // 60
    time_diff = time_diff - (time_diff % 5)

    # value = find_impact(time_diff, comment_count)
    value = find_impact_rdd(time_diff, comment_count)

    # 모델 적용
    # 여기서는 간단히 created_at_plus_2h 컬럼을 추가
    return (post_id, title, content, likes, url, author, views, created_at, updated_at, comment_count, collected_at, value)

In [None]:
model_applied_rdd_a = result_rdd.map(lambda x: apply_model_a(x))    

In [None]:
model_applied_rdd_b = result_rdd.map(lambda x: apply_model_b(x))    

In [None]:
# 3.3s
# rdd 형태의 broadcast 변수를 사용하여 모델 적용
model_applied_rdd_b.collect()

In [None]:
# 12.5s
# dataframe 형태의 broadcast 변수를 사용하여 모델 적용
model_applied_rdd_a.collect()


In [None]:
# SparkSession 종료
spark.stop()