In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col,coalesce, when, trim,to_date, lower, to_timestamp, lit
from pyspark.sql.types import StructType, StructField, StringType, DateType
import boto3

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [2]:
def getSparkSession() -> SparkSession:
    """Spark 세션을 생성하고 반환하는 함수"""
    spark = SparkSession.builder \
        .appName("S3-Spark-Integration") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .getOrCreate()
    return spark

# master 사용하면 오류... 왜??
# def getSparkSession() -> SparkSession:
#     """Spark 세션을 생성하고 반환하는 함수"""
#     spark = SparkSession.builder \
#         .master("local[*]")\
#         .appName("S3-Spark-Integration") \
#         .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
#         .getOrCreate()
#     return spark

In [3]:
def list_parquet_files(s3_bucket: str, s3_prefix: str):
    """S3 특정 경로에서 .parquet 파일들의 전체 경로를 리스트로 반환"""
    s3 = boto3.client("s3")
    response = s3.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)

    full_paths_list = []
    if "Contents" in response:
        for obj in response["Contents"]:
            key = obj["Key"]
            if key.endswith(".parquet"):
                full_path = f"s3a://{s3_bucket}/{key}"
                full_paths_list.append(full_path)
    else:
        print("해당 경로에 파일이 없습니다.")
    return full_paths_list

In [4]:
# 스키마 정의
schema = StructType([
    StructField("stock", StringType(), True),    # 주식이름
    StructField("source", StringType(), True),    # 언론사
    StructField("pub_date", StringType(), True),    # 날짜 및 시간
    StructField("headline", StringType(), True),  # 제목
    StructField("content", StringType(), True)    # 내용
])

# SparkException: Parquet column cannot be converted,Expected: date, Found: BINARY. 오류

In [5]:
# S3 버킷 및 경로
s3_bucket = "de5-finalproj-team5"
s3_prefix = "raw_data/NYTD/"

# S3에서 .parquet 파일 전체 경로 리스트 가져오기
parquet_files = list_parquet_files(s3_bucket, s3_prefix)
print("읽을 파일 리스트:")
for f in parquet_files:
    print(f"  - {f}")

# 파일 리스트가 있으면 Spark로 읽기
# 재사용시에는 특정 파일 이름에 해당하는 파일만 가져올 것
if parquet_files:
    # df = spark.read.parquet(*parquet_files)
    df = spark.read.schema(schema).parquet(*parquet_files)
    df.show(5)
else:
    print("읽을 Parquet 파일이 없습니다.")

읽을 파일 리스트:
  - s3a://de5-finalproj-team5/raw_data/NYTD/2020/nyt_articles_2020.parquet
  - s3a://de5-finalproj-team5/raw_data/NYTD/2021/nyt_articles_2021.parquet
  - s3a://de5-finalproj-team5/raw_data/NYTD/2022/nyt_articles_2022.parquet
  - s3a://de5-finalproj-team5/raw_data/NYTD/2023/nyt_articles_2023.parquet
  - s3a://de5-finalproj-team5/raw_data/NYTD/2024/nyt_articles_2024.parquet
  - s3a://de5-finalproj-team5/raw_data/NYTD/2025/nyt_articles_2025_01.parquet
  - s3a://de5-finalproj-team5/raw_data/NYTD/2025/nyt_articles_2025_02.parquet
+------+------+--------------------+--------------------+--------------------+
| stock|source|            pub_date|            headline|             content|
+------+------+--------------------+--------------------+--------------------+
|Amazon|  NULL|2021-01-02T10:00:...|A Canadian ‘Buy L...|TORONTO — The sno...|
|Amazon|  NULL|2021-01-04T11:00:...|Hundreds of Googl...|OAKLAND, Calif. —...|
|Amazon|  NULL|2021-01-04T16:31:...|Home Solar Is Gro...|The ho

In [6]:
df_cleaned = df \
    .withColumn(
        "source",
        coalesce(df["source"], lit("NYTD"))) \
    .withColumn(
        "pub_date",
        to_date("pub_date", "yyyy-MM-dd'T'HH:mm:ssZ"))


In [7]:
# 전처리 하지 않을 예정이면 df를 그대로 사용
# 전처리 수행하여 cleaned 사용
# df_cleaned = df

# 컬럼명을 변경하여 새로운 DataFrame 생성
# stock 컬럼을 티커 심볼로 매핑하는 컬럼 생성
df_transformed = df_cleaned.select(
    # 매핑 처리: 조건에 따라 티커 심볼로 치환
    when(col("stock") == "Apple", lit("AAPL"))
    .when(col("stock") == "Amazon", lit("AMZN"))
    .when(col("stock") == "Google", lit("GOOGL"))
    .when(col("stock") == "Microsoft", lit("MSFT"))
    .when(col("stock") == "Facebook", lit("META"))
    .when(col("stock") == "Tesla", lit("TSLA"))
    .when(col("stock") == "Netflix", lit("NVDA"))
    .otherwise(col("stock")).alias("symbol"),
    col("source"),
    col("pub_date").alias("datetime"),
    col("headline"),
    col("content").alias("summary")
)

In [8]:
df_transformed.show(10, truncate = False)

+------+------+----------+------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|symbol|source|datetime  |headline                                                                                  |summary                                                                                                                                                                                                                                                                                                                                                                             |
+------+------+---

In [9]:
# 스키마 확인

df_transformed.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- source: string (nullable = false)
 |-- datetime: date (nullable = true)
 |-- headline: string (nullable = true)
 |-- summary: string (nullable = true)



### 중복 확인

In [27]:
# sql 이용 등록

df_transformed.createOrReplaceTempView("nyt_articles")

In [11]:
df_test = spark.sql("""
    SELECT count(DISTINCT symbol)
    FROM nyt_articles""")

In [12]:
df_test.show()

+----------------------+
|count(DISTINCT symbol)|
+----------------------+
|                     7|
+----------------------+



In [29]:
df_duplicate_all = spark.sql("""
    SELECT headline, datetime, symbol, COUNT(*) AS duplicate_count
    FROM nyt_articles
    GROUP BY headline, datetime, symbol
    HAVING COUNT(*) > 1
""")
df_duplicate_all.show(10, truncate=False)

+----------------------------------------------+----------+------+---------------+
|headline                                      |datetime  |symbol|duplicate_count|
+----------------------------------------------+----------+------+---------------+
|How ‘Save the Children’ Is Keeping QAnon Alive|2020-09-29|META  |2              |
+----------------------------------------------+----------+------+---------------+



### 데이터 저장

In [10]:
# 저장할 S3 경로 (폴더 경로)
output_path = "s3a://de5-finalproj-team5/analytic_data/NYTD/"

# 하나의 파일로 저장 (랜덤한 파일명으로 저장됨)
df_transformed.coalesce(1).write.mode("overwrite").parquet(output_path)

print("데이터 저장 완료!")

# S3 클라이언트 생성
s3_client = boto3.client("s3")
bucket_name = "de5-finalproj-team5"
prefix = "analytic_data/NYTD/"

# S3에 저장된 파일 목록 가져오기
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

# 자동 생성된 Parquet 파일 찾기
# bulk로 한번만 실행하는 logic이기에 가능. 다음번에 재사용 한다면 수정 필요 
parquet_file = None
for obj in response.get("Contents", []):
    if obj["Key"].endswith(".parquet"):
        parquet_file = obj["Key"]
        break  # 첫 번째 parquet 파일을 찾으면 중단

if parquet_file:
    # 원하는 파일명으로 복사 (S3 내에서 이동)
    new_filename = "analytic_data/NYTD/nytd_2020_202502_data.parquet"
    copy_source = {'Bucket': bucket_name, 'Key': parquet_file}
    
    s3_client.copy_object(Bucket=bucket_name, CopySource=copy_source, Key=new_filename)

    # 기존 자동 생성된 파일 삭제
    s3_client.delete_object(Bucket=bucket_name, Key=parquet_file)

    print(f"파일명이 `{parquet_file}`에서 `{new_filename}`으로 변경되었습니다.")
else:
    print("Parquet 파일을 찾을 수 없습니다.")


데이터 저장 완료!
파일명이 `analytic_data/NYTD/part-00000-dca80dcb-570c-496b-afa6-d8cfed0dde79-c000.snappy.parquet`에서 `analytic_data/NYTD/nytd_2020_202502_data.parquet`으로 변경되었습니다.


## Before code

In [None]:
# def getFileFromS3(spark: SparkSession, s3_bucket: str, s3_prefix: str):
#     """S3에서 Parquet 파일을 읽어와 DataFrame 반환"""
#     """특정 Bucket 아래에 있는 Parquet 파일 전부 로딩"""
#     s3_path = f"s3a://{s3_bucket}/{s3_prefix}"  # S3 경로 설정
#     print(f"Loading data from: {s3_path}")
    
#     try:
#         df = spark.read.parquet(s3_path)  # Parquet 파일 읽기
#         return df
#     except Exception as e:
#         print(f"Error reading from S3: {e}")
#         return None

# # Spark 세션 생성
# spark = getSparkSession()

# # S3 버킷 및 경로 설정
# s3_bucket = "de5-finalproj-team5"  # 버킷명
# s3_prefix = "raw_data/NYTD/2023/nyt_articles_2023.parquet"  # S3 내부의 파일 경로

# # S3에서 파일 로드
# df = getFileFromS3(spark, s3_bucket, s3_prefix)

# # 데이터 확인
# if df is not None:
#     df.show(5)
# else:
#     print("DataFrame is empty or could not be loaded.")

In [None]:
# # 정규 표현식으로 알파벳, 숫지와 공백만 남긴 후 앞 뒤 공백제거

# df_cleaned = df \
#     .withColumn(
#         "headline",
#         trim(regexp_replace(col("headline"), "[^a-zA-Z0-9\\s]", ""))
#     ) \
#     .withColumn(
#         "content",
#         trim(regexp_replace(col("content"), "[^a-zA-Z0-9\\s]", ""))
#     ) \
#     .withColumn(
#         "pub_date",
#         to_timestamp("pub_date", "yyyy-MM-dd'T'HH:mm:ssZ")
#     )


# # uncased bert는 소문자 처리하기
# # lower(regexp_replace(col("headline"), "[^a-zA-Z0-9\\s]", ""))
# # lower(regexp_replace(col("content"), "[^a-zA-Z0-9\\s]", ""))

In [None]:
# # 중복 그룹을 정의 (headline, datetime, symbol이 같은 경우 같은 그룹)
# window_spec = Window.partitionBy("headline", "datetime", "symbol").orderBy("datetime")

# # 각 그룹별로 row_number를 추가
# df_with_row = df_transformed.withColumn("row_number", row_number().over(window_spec))

# # row_number가 1인 레코드만 유지 (즉, 그룹별 하나만 남기고 삭제)
# df_deduplicated = df_with_row.filter(col("row_number") == 1).drop("row_number")

# # sql 이용 등록

# df_deduplicated.createOrReplaceTempView("nyt_articles_cleaned")

# df_deduplicate_all = spark.sql("""
#     SELECT headline, datetime, symbol, COUNT(*) AS duplicate_count
#     FROM nyt_articles_cleaned
#     GROUP BY headline, datetime, symbol
#     HAVING COUNT(*) > 1
# """)
# df_duplicate_all.show(10, truncate=False)

In [37]:
# # 저장할 S3 경로 설정
# output_path = "s3a://de5-finalproj-team5/analytic_data/NYTD/nytd_2020_202502_data.parquet"

# # # Parquet 파일로 저장 (overwrite 모드: 기존 파일 덮어쓰기)
# # df_transformed.write.mode("overwrite").parquet(output_path)

# df_transformed.coalesce(1).write.mode("overwrite").parquet(output_path)

# print("데이터 저장 완료!")

✅ 데이터 저장 완료!


In [38]:
# # 저장된 데이터 확인

# s3_client = boto3.client("s3")
# bucket_name = "de5-finalproj-team5"
# prefix = "analytic_data/NYTD/"

# response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

# print("S3 저장된 파일 목록:")
# for obj in response.get("Contents", []):
#     print(f"  - {obj['Key']}")


🔹 S3 저장된 파일 목록:
  - analytic_data/NYTD/nytd_2020_202502_data.parquet/_SUCCESS
  - analytic_data/NYTD/nytd_2020_202502_data.parquet/part-00000-91ac7625-9827-43ef-99b0-38236d465c41-c000.snappy.parquet
  - analytic_data/NYTD/nytd_2020_202502_data.parquet/part-00001-91ac7625-9827-43ef-99b0-38236d465c41-c000.snappy.parquet
  - analytic_data/NYTD/nytd_2020_202502_data.parquet/part-00002-91ac7625-9827-43ef-99b0-38236d465c41-c000.snappy.parquet
  - analytic_data/NYTD/nytd_2020_202502_data.parquet/part-00003-91ac7625-9827-43ef-99b0-38236d465c41-c000.snappy.parquet
  - analytic_data/NYTD/nytd_2020_202502_data.parquet/part-00004-91ac7625-9827-43ef-99b0-38236d465c41-c000.snappy.parquet
  - analytic_data/NYTD/nytd_2020_202502_data.parquet/part-00005-91ac7625-9827-43ef-99b0-38236d465c41-c000.snappy.parquet
  - analytic_data/NYTD/nytd_2020_202502_data.parquet/part-00006-91ac7625-9827-43ef-99b0-38236d465c41-c000.snappy.parquet
