In [3]:
# Java 설치
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Spark 3.5.0 다운로드
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# 압축 해제
!tar xf spark-3.5.0-bin-hadoop3.tgz

# PySpark 설치
!pip install -q pyspark findspark

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("SilverLayerTest") \
    .getOrCreate()

spark

In [6]:
from google.colab import files
uploaded = files.upload()   # 여기서 CSV를 업로드한다

Saving blog_posts_한남동_명소_20251122.csv to blog_posts_한남동_명소_20251122.csv


In [7]:
df = spark.read.csv("blog_posts_한남동_명소_20251122.csv",
                    header=True,
                    multiLine=True,
                    escape='"')
df.printSchema()
df.show(3, truncate=False)

root
 |-- platform: string (nullable = true)
 |-- administrative_dong: string (nullable = true)
 |-- title: string (nullable = true)
 |-- link: string (nullable = true)
 |-- bloggername: string (nullable = true)
 |-- bloggerlink: string (nullable = true)
 |-- postdate: string (nullable = true)
 |-- content_raw: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- images: string (nullable = true)
 |-- videos: string (nullable = true)
 |-- like_count: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- post_id: string (nullable = true)
 |-- crawled_at: string (nullable = true)
 |-- status: string (nullable = true)

+--------+-------------------+-----------------------------------------------------+------------------------------------------------+-----------+-----------------------------------+--------+------------------------------------------------------------------------------------------------------------

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("SilverLayerSimple").getOrCreate()

# 1) Raw CSV 로드
df = spark.read.csv(
    "/content/blog_posts_한남동_명소_20251122.csv",
    header=True,
    multiLine=True,
    escape='"'
)

# ==========================
# Silver 변환 시작
# ==========================

# HTML 제거
def strip_html(col):
    return regexp_replace(col, "<[^>]+>", " ")

# 이모지 제거 (기본 문자만 허용)
emoji_regex = "[^\uAC00-\uD7A3a-zA-Z0-9 .,!?]"

# 최소 UI 노이즈만 제거
noise_regex = (
    "재생 \\d+|"         # 네이버 동영상 컨트롤
    "좋아요 \\d+|"       # 좋아요 0
    "접기/펴기|"         # UI toggle
    "URL 복사|"          # 복사 버튼
    "이웃추가|"          # 이웃추가 버튼
    "본문 기타 기능"     # 블로그 UI
)

# 저작권은 한 줄만 제거
copyright_regex = "©[^\\n]+"

# --------------------------
# title 정제
# --------------------------
df = df.withColumn(
    "title",
    trim(
        regexp_replace(
            strip_html(col("title")),
            emoji_regex,
            " "
        )
    )
)

# --------------------------
# content 정제
# --------------------------
clean = strip_html(col("content_raw"))

# 최소 노이즈 제거(본문 손상 X)
clean = regexp_replace(clean, noise_regex, " ")
clean = regexp_replace(clean, copyright_regex, " ")

# URL 제거
clean = regexp_replace(clean, "http\\S+", " ")

# 이모지 제거
clean = regexp_replace(clean, emoji_regex, " ")

# 중복 공백 제거
clean = regexp_replace(clean, "\\s+", " ")

df = df.withColumn("text_clean", trim(clean))

# --------------------------
# 단어 수
# --------------------------
df = df.withColumn("content_length", size(split(col("text_clean"), " ")))

# --------------------------
# hashtags 문자열 → 배열 변환
# --------------------------
df = df.withColumn(
    "hashtags",
    split(
        regexp_replace(regexp_replace(col("hashtags"), "[\\[\\]']", ""), "\\s*,\\s*", ","),
        ","
    )
)

# --------------------------
# like/comment 캐스팅
# --------------------------
df = df.withColumn("like_count", col("like_count").cast("int"))
df = df.withColumn("comment_count", col("comment_count").cast("int"))

# --------------------------
# engagement score
# --------------------------
df = df.withColumn(
    "engagement_score",
    log1p(col("like_count") + col("comment_count"))
)

# ==========================
# Silver Schema 선택
# ==========================
silver_df = df.select(
    "post_id",
    "link",
    "title",
    "text_clean",
    "content_length",
    "hashtags",
    "like_count",
    "comment_count",
    "engagement_score"
)

silver_df.show(5, truncate=False)

+------------+-------------------------------------------------+--------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [28]:
silver_pdf = silver_df.toPandas()
silver_pdf.to_csv("silver_output.csv", index=False)

from google.colab import files
files.download("silver_output.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
silver_df.write.mode("overwrite").parquet("blog_silver.parquet")