In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

spark = SparkSession.builder.master("local[1]").appName("filtered_data").getOrCreate()
filtered_df = spark.read.option("header", True).csv("data/1_filtered_data.csv").orderBy(rand(seed=42))

filtered_df.show()

131072x1 화면 크기가 잘못됐습니다. 문제가 예상됩니다
24/10/26 10:02:32 WARN Utils: Your hostname, DESKTOP-E1527F8 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/10/26 10:02:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/26 10:02:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/26 10:02:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , title, content, write_date, url, writer, category
 Schema: _c0, title, content, write_date, url, writer, category
Expected: _c0 but found: 
CSV file: file:///home/jiwoochris/projects/ssafy-custom-news-backend/data_processing/data/1_filtered_data.csv


+---+-----------------------------------+----------------------------------+-------------------+--------------------+--------------+------------+
|_c0|                              title|                           content|         write_date|                 url|        writer|    category|
+---+-----------------------------------+----------------------------------+-------------------+--------------------+--------------+------------+
|255|  대전시 '매봉공원 민간특례사업 ...|      (대전=뉴스1) (이름) 기자 ...|2020-02-14 16:11:42|https://www.news1...|         뉴스1|    여성복지|
|313|     농림부 '안성·천안 ai 고병원...|   【서울=뉴시스】(이름) 기자 =...|2017-12-31 14:07:06|https://newsis.co...|        뉴시스|라이프스타일|
|712|     대전 '코로나19' 확진 연구원...|     (대전ㆍ충남=뉴스1) (이름) ...|2020-02-26 15:23:37|https://www.news1...|         뉴스1|    여성복지|
|395|      (이름), '동안 미모'에 시선...|      (서울=뉴스1) (이름) 기자 ...|2019-05-29 20:38:27|https://www.news1...|         뉴스1|        문화|
|105|  현대차, 캐리비안베이와 '고성능...|   【서울=뉴시스】(이름) 기자 =...|2019-06-24 09:22:50|https://n

In [2]:
import re
from pyspark.sql.functions import udf, array_join, spark_partition_id
from pyspark.sql.types import ArrayType, StringType
from konlpy.tag import Okt

# 배치 처리를 위한 UDF
@udf(ArrayType(StringType()))
def preprocess_text_batch(text):
    if text is None:
        return []
    # 구두점 제거
    text = re.sub("[^가-힣a-zA-Z0-9]+", " ", text)
    # 토큰화 - Okt 객체를 함수 내부에서 생성
    okt = Okt()
    tokens = okt.morphs(text)
    return tokens


# 진행 상황 모니터링을 위한 로깅
filtered_df.cache()  # 데이터 캐싱
total_rows = filtered_df.count()

# 전처리 작업 수행
processed_df = (filtered_df
    .withColumn("processed_content", preprocess_text_batch("content"))
)

# 파티션별 처리 현황 모니터링 (선택사항)
processed_df.groupBy(spark_partition_id()).count().show()

24/10/26 10:02:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , title, content, write_date, url, writer, category
 Schema: _c0, title, content, write_date, url, writer, category
Expected: _c0 but found: 
CSV file: file:///home/jiwoochris/projects/ssafy-custom-news-backend/data_processing/data/1_filtered_data.csv
24/10/26 10:02:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , title, content, write_date, url, writer, category
 Schema: _c0, title, content, write_date, url, writer, category
Expected: _c0 but found: 
CSV file: file:///home/jiwoochris/projects/ssafy-custom-news-backend/data_processing/data/1_filtered_data.csv

+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|    5|
|                   1|    5|
|                   2|    5|
|                   3|    5|
|                   4|    5|
|                   5|    5|
|                   6|    5|
|                   7|    5|
|                   8|    5|
|                   9|    5|
|                  10|    5|
|                  11|    5|
|                  12|    5|
|                  13|    5|
|                  14|    5|
|                  15|    5|
|                  16|    5|
|                  17|    5|
|                  18|    5|
|                  19|    5|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [3]:
from sklearn.feature_extraction.text import TfidfVectorizer
from joblib import dump
from tqdm import tqdm

def identity(x):
    return x

def create_and_train_tfidf_vectorizer(processed_df, max_features=5000, batch_size=1000):
    print("TF-IDF 벡터라이저 생성 중...")
    tfidf_vectorizer = TfidfVectorizer(max_features=max_features, tokenizer=identity, preprocessor=identity)

    print("TF-IDF 벡터라이저 학습 중...")
    processed_df = processed_df.cache()
    print("데이터프레임이 캐시에 저장되었습니다.")

    total_documents = processed_df.count()
    print(f"총 문서 수: {total_documents}")

    for i in tqdm(range(0, total_documents, batch_size), desc="TF-IDF 학습 진행"):
        end = min(i + batch_size, total_documents)
        batch = processed_df.select("processed_content").rdd.map(lambda x: x[0]).collect()[i:end]
        tfidf_vectorizer.fit(batch)

    print("TF-IDF 벡터라이저 학습 완료")
    return tfidf_vectorizer

def save_tfidf_vectorizer(tfidf_vectorizer, filename='tfidf_vectorizer.joblib'):
    print(f"TF-IDF 벡터라이저 저장 중... ({filename})")
    dump(tfidf_vectorizer, filename)
    print("TF-IDF 벡터라이저 저장 완료")

# 메인 실행 부분
tfidf_vectorizer = create_and_train_tfidf_vectorizer(processed_df)
save_tfidf_vectorizer(tfidf_vectorizer)

TF-IDF 벡터라이저 생성 중...
TF-IDF 벡터라이저 학습 중...
데이터프레임이 캐시에 저장되었습니다.


                                                                                

총 문서 수: 1000


TF-IDF 학습 진행: 100%|██████████| 1/1 [00:12<00:00, 12.86s/it]

TF-IDF 벡터라이저 학습 완료
TF-IDF 벡터라이저 저장 중... (tfidf_vectorizer.joblib)
TF-IDF 벡터라이저 저장 완료





In [4]:
from joblib import load

# TF-IDF 벡터라이저 불러오기
print("TF-IDF 벡터라이저 불러오는 중...")
loaded_tfidf_vectorizer = load('tfidf_vectorizer.joblib')
print("TF-IDF 벡터라이저 불러오기 완료")

# 샘플 데이터 하나 임의로 생성
sample_data = processed_df.select("processed_content").limit(1).collect()[0][0]

print("샘플 데이터:")
print(sample_data)

print("샘플 데이터 타입:")
print(type(sample_data))
print("샘플 데이터 길이:")
print(len(sample_data))
# 샘플 데이터로 벡터 만들기
print("\n벡터 생성 중...")
sample_vector = loaded_tfidf_vectorizer.transform([sample_data])

print("생성된 벡터 형태:")
print(sample_vector.shape)

print("\n벡터의 일부 값:")
print(sample_vector.data[:10])  # 처음 10개 값만 출력


TF-IDF 벡터라이저 불러오는 중...
TF-IDF 벡터라이저 불러오기 완료
샘플 데이터:
['대전', '뉴스', '1', '이름', '기자', '대전', '매', '봉', '공원', '민간', '특례', '사업', '취소', '는', '부당하다는', '법원', '의', '판결', '에', '대', '전시', '가', '항소', '하기로', '했다', '손철웅', '대', '전시', '환경', '녹지', '국장', '은', '14일', '시청', '기자실', '에서', '간담', '회', '를', '갖고', '어제', '13일', '법원', '의', '판결', '은', '이', '사업', '에', '대한', '제안', '내용', '자체', '가', '철회', '됨에', '따른', '민간', '사업자', '가', '입은', '피해', '가', '공익', '훼손', '가치', '보다', '크다는', '것', '이', '요지', '라며', '가장', '핵심', '적', '인', '공익', '과', '사익', '에', '대한', '법원', '판단', '결과', '와', '저희', '들', '이', '갖고', '있는', '견해', '가', '많이', '상반', '되기', '때문', '에', '항소심', '을', '통해', '다시', '대응', '할', '계획', '이라고', '밝혔다', '손', '국장', '은', '법원', '판결', '내용', '에', '대해', '조목조목', '이', '자리', '에서', '반박', '하기', '보다는', '총론', '적', '인', '틀', '에서', '1', '심', '판결', '내용', '에', '추가', '적', '으로', '보완', '하고', '소송', '전략', '을', '다시', '정밀', '하게', '설계', '해', '항소심', '에서', '승소', '하도록', '하겠다', '고', '말', '했다', '그러면서', '자연환경', '보전', '대덕', '연구', '단지', '연구기관', '들', '의', '연구',

In [12]:
# 전체 데이터셋 벡터화 및 CSV 통합
print("전체 데이터셋 벡터화 및 CSV 통합 시작...")

# processed_content를 문자열로 변환
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def vectorize_and_integrate(df, vectorizer):
    return df.withColumn("tfidf_vector", vectorize_udf("processed_content"))

def save_integrated_csv(df, output_path):
    df.drop("processed_content").coalesce(1).write.option("header", "true").mode("overwrite").option("quote", "\"").option("escape", "\"").csv(output_path)

def merge_part_files(output_path, final_output_file):
    import os
    import glob
    import shutil

    part_files = glob.glob(os.path.join(output_path, "part-*.csv"))
    
    with open(final_output_file, "w") as outfile:
        with open(part_files[0], "r") as firstfile:
            outfile.write(firstfile.readline())
        
        for filename in part_files:
            with open(filename, "r") as infile:
                next(infile)
                outfile.write(infile.read())
    
    shutil.rmtree(output_path)

# 메인 실행 부분
vectorized_processed_df = vectorize_and_integrate(processed_df, loaded_tfidf_vectorizer)

print("전체 데이터셋 벡터화 및 CSV 통합 완료")

# 통합된 CSV 저장
print("통합된 CSV 저장 중...")
output_path = "integrated_data_with_vectors.csv"
save_integrated_csv(vectorized_processed_df, output_path)

final_output_file = "data/2_integrated_data_with_vectors.csv"
merge_part_files(output_path, final_output_file)

print(f"단일 CSV 파일로 저장 완료: {final_output_file}")
print("통합된 CSV 저장 완료")

# 저장된 데이터 확인
print("\n저장된 데이터 정보:")
print(f"총 행 수: {vectorized_processed_df.count()}")
print(f"컬럼: {vectorized_processed_df.columns}")
print("첫 10개 행 샘플:")
vectorized_processed_df.drop("processed_content").show(10, truncate=False)


전체 데이터셋 벡터화 및 CSV 통합 시작...
전체 데이터셋 벡터화 및 CSV 통합 완료
통합된 CSV 저장 중...


                                                                                

단일 CSV 파일로 저장 완료: data/2_integrated_data_with_vectors.csv
통합된 CSV 저장 완료

저장된 데이터 정보:


                                                                                

총 행 수: 1000
컬럼: ['_c0', 'title', 'content', 'write_date', 'url', 'writer', 'category', 'processed_content', 'tfidf_vector']
첫 10개 행 샘플:
+---+---------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
# 고유한 카테고리 나열
unique_categories = vectorized_processed_df.select('category').distinct().collect()
print("고유한 카테고리 목록:")
for row in unique_categories:
    print(f"- {row['category']}")



고유한 카테고리 목록:
- 여행레저
- 여성복지
- 라이프스타일
- 문화
- 지역
- 사건사고
- 국제
- IT_과학
- 사회일반
- 스포츠
- 경제
- 취미
- 정치
-  하하 등이 모인 ';유재석 없는 유라인 단톡방';의 존재를 공개했다. 광희는 ""단톡방에서 ';너네가 어떻게 했길래 (이름)이 차고 들어갔냐';
- 산업
- 교육
- 건강
- 연예
- 2020-08-27 11:41:17
-  급기야 ';라디오스타'; mc 김국진과 김구라에게 ';맞춤 사업 아이템';을 제시하며 동업을 전격 제안한다. (이름) 피셜 대박 성공을 보장하는 사업 아이디어는 무엇일지 호기심을 유발한다. . . 한편
-  최씨네 ';자전거 가족';은 부상으로 받은 늘푸름홍천한우에 마냥 웃었다.. . . ."
-  승마
- https://moneys.mt.co.kr/news/mwView.php?no=2014071414211083713
-  lte-m 등 개별 통신 기술을 별도로 검증할 수는 있었지만 망이 연동된 상황에서는 서비스나 단말을 사전에 테스트하기 힘들었다는 뜻이다.. . . . 김 단장은 “안전망 검증센터는 kt의 자산이긴 하지만 재난망과 관련해 기술검증을 하겠다는 곳이라면 어떤 기관이나 기업에도 개방하겠다는 게 kt의 입장""이라며 “공공안전망 사업의 토대 역할을 하겠다는 취지로 센터를 구축했다”고 말했다.. . 실제로 센터 구축 이후 보름만에 여러 회사가 찾아온 것으로 알려졌다.. . 현재 시범사업 단계인 재난망에 적합한 통신장비나 단말
- 지디넷
-  클라우드 등과 관련한 기술들을 소개할 전망이다.. . 기업들이 mwc 부스 참여를 망설이는 이유는 코로나19 확산 우려와 직원 안전 때문인 점도 있지만 오프라인 참여율 저조로 인해 성과 또한 기대할 수 없어서다.. . 지난해 코로나19 사태로 인해 주최 측인 세계이동통신사업자연합회(gsma)는 mwc 개최를 취소한 바 있다. gsma는 기업들이 이미 납부한 10억~30억원에 달하는 참가비를 올해로 이연해줬으나
- https

                                                                                

### 카테고리 Classification 모델 학습

In [7]:
# 카테고리 학습을 위한 준비
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from joblib import dump
import pandas as pd
import numpy as np

print("필요한 라이브러리와 모듈을 가져왔습니다.")

def create_and_train_category_classifier(df, batch_size=100):
    print("카테고리 분류기 생성 중...")

    total_documents = df.count()
    print(f"총 문서 수: {total_documents}")

    # 데이터프레임을 판다스로 변환
    print("데이터프레임을 판다스로 변환 중...")
    pandas_df = df.toPandas()
    print("데이터프레임을 판다스로 변환했습니다.")
    
    # 데이터 전처리
    print("데이터 전처리 중...")
    X = pandas_df['tfidf_vector'].apply(lambda x: np.array(x.strip('[]').split(','), dtype=float))
    y = pandas_df['category']
    print("데이터 전처리 완료")
    print(X.shape, y.shape)
    print(X[:5])
    
    # 모든 고유한 y 값 출력
    unique_categories = y.unique()
    print("고유한 카테고리:")
    for category in unique_categories:
        print(category)
    print(f"총 {len(unique_categories)}개의 고유한 카테고리가 있습니다.")

    # 데이터 분할
    print("데이터 분할 중...")
    X_train, X_test, y_train, y_test = train_test_split(X.tolist(), y, test_size=0.2, random_state=42)

    # 레이블 인코딩
    label_encoder = LabelEncoder()
    y_train = label_encoder.fit_transform(y_train)
    y_test = label_encoder.transform(y_test)

    # 로지스틱 회귀 모델 생성 및 훈련
    model = LogisticRegression(multi_class='ovr', max_iter=1000)
    
    # 배치 크기로 데이터를 나누어 학습
    for i in range(0, len(X_train), batch_size):
        X_batch = X_train[i:i+batch_size]
        y_batch = y_train[i:i+batch_size]
        model.partial_fit(X_batch, y_batch, classes=np.unique(y_train))
    
    # 모델 평가
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"모델 정확도: {accuracy:.2f}")
    
    # 모델 및 레이블 인코더 저장
    dump((model, label_encoder), "data/category_classification_model.joblib")
    print("카테고리 분류 모델이 저장되었습니다.")
    
    return model, label_encoder

# 카테고리 분류기 생성 및 훈련
category_model, category_label_encoder = create_and_train_category_classifier(vectorized_processed_df)
print("카테고리 분류기 생성 및 훈련이 완료되었습니다.")

필요한 라이브러리와 모듈을 가져왔습니다.
카테고리 분류기 생성 중...


                                                                                

총 문서 수: 1000
데이터프레임을 판다스로 변환 중...


                                                                                

데이터프레임을 판다스로 변환했습니다.
데이터 전처리 중...
데이터 전처리 완료
(1000,) (1000,)
0    [0.016097231068149625, 0.0, 0.0, 0.0, 0.0, 0.0...
1    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
2    [0.05750353190410851, 0.0, 0.0, 0.0, 0.0, 0.0,...
3    [0.03604717728727374, 0.0, 0.0, 0.0, 0.0, 0.0,...
4    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
Name: tfidf_vector, dtype: object
고유한 카테고리:
여성복지
라이프스타일
문화
여행레저
지역
사건사고
국제
IT_과학
스포츠
경제
사회일반
취미
 하하 등이 모인 ';유재석 없는 유라인 단톡방';의 존재를 공개했다. 광희는 ""단톡방에서 ';너네가 어떻게 했길래 (이름)이 차고 들어갔냐';
정치
산업
교육
연예
건강
2020-08-27 11:41:17
 급기야 ';라디오스타'; mc 김국진과 김구라에게 ';맞춤 사업 아이템';을 제시하며 동업을 전격 제안한다. (이름) 피셜 대박 성공을 보장하는 사업 아이디어는 무엇일지 호기심을 유발한다. . . 한편
 최씨네 ';자전거 가족';은 부상으로 받은 늘푸름홍천한우에 마냥 웃었다.. . . ."
 승마
https://moneys.mt.co.kr/news/mwView.php?no=2014071414211083713
 lte-m 등 개별 통신 기술을 별도로 검증할 수는 있었지만 망이 연동된 상황에서는 서비스나 단말을 사전에 테스트하기 힘들었다는 뜻이다.. . . . 김 단장은 “안전망 검증센터는 kt의 자산이긴 하지만 재난망과 관련해 기술검증을 하겠다는 곳이라면 어떤 기관이나 기업에도 개방하겠다는 게 kt의 입장""이라며 “공공안전망 사업의 토대 역할을 하겠다는 취지로 센터를 구축했다”고 말했다.. . 

ValueError: y contains previously unseen labels: 'https://www.thisisgame.com/webzine/news/nboard/263/?page=102&n=107736'

In [None]:
from joblib import load

# 저장된 카테고리 분류 모델 로드
loaded_model, loaded_label_encoder = load("data/category_classification_model.joblib")

print("저장된 카테고리 분류 모델을 로드했습니다.")
loaded_label_encoder.inverse_transform([0])