In [2]:
from datetime import datetime
import requests
from bs4 import BeautifulSoup as BS
import pandas as pd
import json

In [3]:
today = datetime.today().strftime("%Y%m%d")
base_url = "https://news.naver.com/main/list.naver?mode=LSD&mid=shm&sid1=101&date=" + today

In [4]:
def get_news_links(page):
    url = base_url + "&page=" + str(page)
    response = requests.get(url)
    soup = BS(response.text, "html.parser")
    
    links = []
    for a in soup.select("ul.type06_headline li dl dt a"):
        links.append(a["href"])
    for a in soup.select("ul.type06 li dl dt a"):
        links.append(a["href"])
    
    return links

def get_news_content(url):
    response = requests.get(url)
    soup = BS(response.text, "html.parser")
    
    title_tag = soup.select_one("h2.media_end_head_headline")
    content_tag = soup.find('article',{'id':'dic_area'})
    
    if title_tag and content_tag:
        title = title_tag.get_text().strip()
        content = content_tag.get_text().strip()
        return title, content
    else:
        return None, None

In [5]:
# 크롤링 시작
news_links = []
page = 1

while True:
    links = get_news_links(page)
    if not links or any(link in news_links for link in links):
        break
    news_links.extend(links)
    page += 1


In [6]:
# 뉴스 기사 내용 크롤링
news_contents = []
for link in news_links[:10]:    # 숫자 변경
    try:
        title, content = get_news_content(link)
        news_contents.append((title, content))
    except Exception as e:
        print(f"Failed to get content from {link}: {e}")

In [7]:
import numpy as np
import itertools

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer # pip install sentence_transformers
from bareunpy import Tagger # pip install bareunpy

tagger = Tagger('koba-Q2CYNCI-XZ7E7PI-X6YRKPY-K4Z2KMY')
model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')


In [8]:
def keyword_ext(text):

    tokenized_doc = tagger.pos(text)
    tokenized_nouns = ' '.join([word[0] for word in tokenized_doc if word[1] == 'NNG' or word[1] == 'NNP'])

    n_gram_range = (1,1)

    count = CountVectorizer(ngram_range=n_gram_range).fit([tokenized_nouns])
    candidates = count.get_feature_names_out()

    doc_embedding = model.encode([text])
    candidate_embeddings = model.encode(candidates)

    return mmr(doc_embedding, candidate_embeddings, candidates, top_n=5, diversity=0.2)

def mmr(doc_embedding, candidate_embeddings, words, top_n, diversity):

    # 문서와 각 키워드들 간의 유사도가 적혀있는 리스트
    word_doc_similarity = cosine_similarity(candidate_embeddings, doc_embedding)

    # 각 키워드들 간의 유사도
    word_similarity = cosine_similarity(candidate_embeddings)

    # 문서와 가장 높은 유사도를 가진 키워드의 인덱스를 추출.
    # 만약, 2번 문서가 가장 유사도가 높았다면
    # keywords_idx = [2]
    keywords_idx = [np.argmax(word_doc_similarity)]

    # 가장 높은 유사도를 가진 키워드의 인덱스를 제외한 문서의 인덱스들
    # 만약, 2번 문서가 가장 유사도가 높았다면
    # ==> candidates_idx = [0, 1, 3, 4, 5, 6, 7, 8, 9, 10 ... 중략 ...]
    candidates_idx = [i for i in range(len(words)) if i != keywords_idx[0]]

    # 최고의 키워드는 이미 추출했으므로 top_n-1번만큼 아래를 반복.
    # ex) top_n = 5라면, 아래의 loop는 4번 반복됨.
    for _ in range(top_n - 1):
        candidate_similarities = word_doc_similarity[candidates_idx, :]
        target_similarities = np.max(word_similarity[candidates_idx][:, keywords_idx], axis=1)

        # MMR을 계산
        mmr = (1-diversity) * candidate_similarities - diversity * target_similarities.reshape(-1, 1)
        mmr_idx = candidates_idx[np.argmax(mmr)]

        # keywords & candidates를 업데이트
        keywords_idx.append(mmr_idx)
        candidates_idx.remove(mmr_idx)

    # print(keywords_idx)

    return [words[idx] for idx in keywords_idx]


In [9]:
# Create a DataFrame to store titles and nouns
df = pd.DataFrame(columns=["Title", "Keywords"])

# Text analysis using Mecab and store in DataFrame
for i, (title, content) in enumerate(set(news_contents), start=1):
    if content:
        keywords = keyword_ext(content)
        new_row = pd.DataFrame({"Title": [title], "Keywords": [keywords]})
        df = pd.concat([df, new_row], ignore_index=True)        

# Display the DataFrame
print(df)

                                         Title                       Keywords
0            LG에너지솔루션, 호주 리튬 광산 대규모 투자…펀더멘탈 강화        [리튬, 협약, 수산화리튬, 전기, 계약]
1            남양유업, 경영진 교체 후 대리점 첫 만남...“협력 지속”        [기업, 서울, 회의실, 남양유업, 사장]
2          [속보]최수연 네이버 대표 "단기적으로 매각 결정하지 않을 것"          [회의, 국회, 참석, 대표, 네이버]
3  "개인맞춤형 헬스케어 사업 확대"..광동제약, 체외진단기기사 '프리시젼' 인수      [주식, 기업, 인수, 매매, 프리시젼바이오]
4                 코레일, 카이스트와 '철도표준 모빌리티 학과' 설립      [직원, 한국철도공사, 총장, 김동규, 사장]
5          인질잡힌 환율에 운신폭 제한…"재정·통화 정책조합 묘수 찾아야"  [금리, 인플레이션, 현대경제연구원, 위기, 소득세]


## 하둡

In [46]:
import os
import subprocess
from pyarrow import fs
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime
import pandas as pd
import pyarrow.csv as pc
# Get the current date and format it as a string
today = datetime.now().strftime("%Y-%m-%d")

# Define the file path in HDFS

#hdfs dfs -mkdir test
file_path = f"/test{today}.csv"
classpath = subprocess.Popen(["/home/ksk/hadoop/bin/hdfs", "classpath", "--glob"], stdout=subprocess.PIPE).communicate()[0]
os.environ["CLASSPATH"] = classpath.decode("utf-8")
hdfs = fs.HadoopFileSystem(host='192.168.0.206', port=8020, user='ksk')

# Pandas DataFrame을 PyArrow의 Table 객체로 변환
table = pa.Table.from_pandas(df)

# Write the table to HDFS as a Parquet file
with hdfs.open_output_stream(file_path) as stream:
    # pc.write_table(table, stream)
    pc.write_csv(table,stream)
print(f"DataFrame saved to HDFS at {file_path}")
    
# # PyArrow를 사용하여 Parquet 포맷으로 데이터 저장
# pq.write_table(table, file_path, filesystem=hdfs)




DataFrame saved to HDFS at /test/2024-07-01.csv


In [50]:
#hdfs dfs -mkdir test
spark.stop()

In [52]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [63]:
df = spark.read.csv("hdfs://192.168.0.206:8020/test/2024-06-28.csv",header=True)

In [65]:
df.count()

                                                                                

8734

In [66]:
spark

In [45]:
df2

Unnamed: 0,Title,content
0,"NDF, 1378.8원/1379.2원…4.65원 상승",원·달러 1개월물 스와프 포인트 -2.35원[이데일리 하상렬 기자] 간밤 뉴욕 차액...
1,"NH투자증권, 부동산PF 나홀로 선방…2Q 영업익 2230억대 기대",서울 여의도 NH투자증권 사옥 전경 /사진 제공=NH투자증권금융권의 뇌관으로 지목...
2,"5월 서울 아파트 거래량 5천여 건, 3년 만에 최대",(서울=연합뉴스) 김도훈 기자 = 5월 서울 아파트 거래량이 5천여 건에 달하는 등...
3,"신협중앙회, 1조원 규모 부실채권 정리 추진",부동산 및 건설업 관련 충당금 추가 적립\n\n\n\n신협중앙회는 1일 총 1조원 ...
4,"'기후동행카드' 본사업 시작…""단기권 출시·할인 혜택 확대""",6개월 간 시범 운영 기간 시민 의견 반영단기권 구매자도 할인 혜택 받을 수 있어 ...
...,...,...
6993,"부산창경, 30억 원 한국모태펀드 투자금 결성 완료",부산창조경제혁신센터(이하 부산창경)가 지역 스타트업의 성장을 위한 30억 원 규모의...
6994,컨테이너 가득 쌓인 부산항,[부산=뉴시스] 하경민 기자 = 산업통상자원부는 지난달 수출이 전년보다 5.1% 증...
6995,"'한전 납품' 상장사 아이앤씨, 하청업체 갑질 '덜미'","공정위, 하도급법 위반으로 시정명령 벌점 누적되면 공공입찰 참가 제한\n\n\n\n..."
6996,"""밸류업 인센티브 구체화…'주주환원' 여력 지주사 주목""",SK증권 보고서[이데일리 이용성 기자] 밸류업 프로그램에 대한 인센티브가 구체화하고...



첫 번째 방법은 HDFS의 출력 스트림을 명시적으로 열고 닫으며, 스트림을 직접 다룰 수 있는 유연성을 제공합니다. 이 방법은 대용량 데이터를 점진적으로 쓰거나 스트림 제어가 필요한 상황에서 유리합니다.
두 번째 방법은 PyArrow의 고수준 API를 사용하여 간단하고 직관적으로 데이터를 HDFS에 저장합니다. 코드가 간결하고 파일 시스템을 추상화하기 때문에 다양한 파일 시스템에도 쉽게 적용할 수 있습니다.