In [1]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from konlpy.tag import Okt
import networkx as nx
import numpy as np
from tqdm import tqdm
import os
import ray
from itertools import combinations
from collections import Counter

In [27]:
ray.init(num_cpus=16, ignore_reinit_error=True)

2024-07-25 20:08:28,690	INFO worker.py:1788 -- Started a local Ray instance.


0,1
Python version:,3.9.19
Ray version:,2.32.0


In [3]:
def tokenize(text):
    # 형태소 분석을 통해 명사만 추출
    return [word for word in okt.nouns(text) if len(word) > 1]

In [4]:
def extract_keywords_tfidf(text, num_keywords=20):
    # TF-IDF 벡터화기 생성
    vectorizer = TfidfVectorizer(tokenizer=tokenize, min_df=1)
    X = vectorizer.fit_transform([text])  # 문자열 전체를 하나의 문서로 처리

    # 단어와 TF-IDF 점수 매핑
    tfidf_scores = X.sum(axis=0).A1
    words = vectorizer.get_feature_names_out()

    # 단어와 TF-IDF 점수를 데이터프레임으로 변환
    tfidf_df = pd.DataFrame({'Word': words, 'TF-IDF Score': tfidf_scores})

    # TF-IDF 점수로 정렬하여 상위 num개 키워드 출력
    return tfidf_df.sort_values(by='TF-IDF Score', ascending=False).head(num_keywords)

In [5]:
def extract_keywords_textrank(text, num_keywords=20):
    okt = Okt()
    
    # 형태소 분석 및 명사 추출
    words = tokenize(text)
    
    # 단어들의 출현 빈도수 계산
    word_counts = Counter(words)
    
    # 그래프 생성
    graph = nx.Graph()
    
    # 단어 노드 추가
    for word, count in word_counts.items():
        if count > 1:  # 최소 출현 빈도 조건
            graph.add_node(word, count=count)
    
    # 단어의 연결을 위한 윈도우 크기 설정
    window_size = 4
    
    # 윈도우 안에서 단어 간의 연결 설정
    for i in range(len(words) - window_size + 1):
        window_words = words[i:i + window_size]
        for w1, w2 in combinations(window_words, 2):
            if graph.has_node(w1) and graph.has_node(w2):
                if graph.has_edge(w1, w2):
                    graph[w1][w2]['weight'] += 1
                else:
                    graph.add_edge(w1, w2, weight=1)
    
    # PageRank 계산
    rank = nx.pagerank(graph, weight='weight')
    
    # 상위 num_keywords개의 키워드 추출
    top_keywords = sorted(rank.items(), key=lambda x: x[1], reverse=True)[:num_keywords]
    
    #return top_keywords

    return [keyword for keyword, _ in top_keywords]

In [6]:
def extract_keywords_df(df, num=20): #func=extract_keywords_tfidf, num=20):
    doc_keywords = []
    
    for doc in df['text'] :
        #keywords = extract_keywords_tfidf(doc, num)
        #doc_keywords.append(keywords)
        doc_keywords.append(doc[:10])
        
    #df.loc[:, 'keywords'] = doc_keywords
    
    return doc_keywords

In [7]:
def test(text):
    
    
    
    okt = Okt()
    
    # 형태소 분석 및 명사 추출
    words = tokenize(text)
    
    return text[:20]

In [28]:
@ray.remote
# 각 데이터 프레임 청크에서 키워드 추출 함수 적용
def process_chunk(chunk):
    
    def tokenize(text):
        # 형태소 분석을 통해 명사만 추출
        return [word for word in Okt().nouns(text) if len(word) > 1]
    
    def test(text):
                
        # 형태소 분석 및 명사 추출
        words = tokenize(text) 
        print(type(words))
        
        return words #text[:10]
    
    def extract_keywords_textrank(text, num_keywords=20):
   
        # 형태소 분석 및 명사 추출
        words = tokenize(text)
    
        # 단어들의 출현 빈도수 계산
        word_counts = Counter(words)

        # 그래프 생성
        graph = nx.Graph()

        # 단어 노드 추가
        for word, count in word_counts.items():
            if count > 1:  # 최소 출현 빈도 조건
                graph.add_node(word, count=count)

        # 단어의 연결을 위한 윈도우 크기 설정
        window_size = 4

        # 윈도우 안에서 단어 간의 연결 설정
        for i in range(len(words) - window_size + 1):
            window_words = words[i:i + window_size]
            for w1, w2 in combinations(window_words, 2):
                if graph.has_node(w1) and graph.has_node(w2):
                    if graph.has_edge(w1, w2):
                        graph[w1][w2]['weight'] += 1
                    else:
                        graph.add_edge(w1, w2, weight=1)

        # PageRank 계산
        rank = nx.pagerank(graph, weight='weight')

        # 상위 num_keywords개의 키워드 추출
        top_keywords = sorted(rank.items(), key=lambda x: x[1], reverse=True)[:num_keywords]

        return top_keywords

        #return [keyword for keyword, _ in top_keywords]

    chunk['keywords'] = chunk['text'].apply(extract_keywords_textrank)

    return chunk

In [9]:
# 데이터 프레임을 청크로 나누는 함수
def split_dataframe(df, chunk_size):
    chunks = [df[i:i + chunk_size] for i in range(0, df.shape[0], chunk_size)]
    
    return chunks

In [10]:
directory_path = '/home/osung/data/korean/modu/json'
df = pd.read_csv(directory_path+'/combined_news.tsv', sep='\t')

In [29]:
len(df)

3611424

In [30]:
len(df) / 2

1805712.0

In [31]:
length = int(len(df) / 2)

In [32]:
length

1805712

In [33]:
length / 16

112857.0

In [34]:
len(df[:length])

1805712

In [37]:
chunk_size = int(length/16)
chunks = split_dataframe(df[:length], chunk_size)

In [38]:
len(chunks[0])

112857

In [None]:
# Ray를 사용하여 병렬 처리
futures = [process_chunk.remote(chunk) for chunk in chunks]


# tqdm을 사용하여 진행 상황 표시
results = []
for future in tqdm(ray.get(futures), total=len(futures), desc="Processing"):
    results.append(future)

#result_chunks = ray.get(futures)

In [164]:
print(result_chunks[0])

                   id topic  \
0    NIRW1900000001.1    사회   
1    NIRW1900000001.4    사회   
2    NIRW1900000001.5    사회   
3    NIRW1900000001.6    사회   
4    NIRW1900000001.7    사회   
5    NIRW1900000001.8    사회   
6    NIRW1900000001.9    사회   
7   NIRW1900000001.11    사회   
8   NIRW1900000001.12    사회   
9   NIRW1900000001.17    기술   
10  NIRW1900000001.23    사회   
11  NIRW1900000001.25    사회   
12  NIRW1900000001.27    경제   
13  NIRW1900000001.31    경제   
14  NIRW1900000001.32    사회   
15  NIRW1900000001.34    사회   
16  NIRW1900000001.36    사회   
17  NIRW1900000001.37    정치   
18  NIRW1900000001.39    사회   
19  NIRW1900000001.40    사회   
20  NIRW1900000001.41    사회   
21  NIRW1900000001.43    사회   
22  NIRW1900000001.44    사회   
23  NIRW1900000001.47    사회   
24  NIRW1900000001.49    사회   

                                                 text  \
0   "대통령, 시장 방문만 하지 말고 실천해달라" 2008년의 마지막 새벽, 언론의 카...   
1   진성호 의원, 겨우 이 정도였나 오늘(31일) 한나라당 의원 총회에서 조선일보 출신...   
2   "소의 해, 정치인들 싸움 좀 그

In [172]:
ray.shutdown()

In [20]:
len(result_chunks)

4

In [20]:
len(df)/ 4

902856.0

In [13]:
df2 = df.drop(columns=['topic'])

In [14]:
df2.columns

Index(['id', 'text'], dtype='object')

In [22]:
# 데이터 병렬 처리
num_cores = 4 #mp.cpu_count()
df_with_keywords = parallelize_dataframe(df[:1000], test, num_cores=num_cores)

250
250
250
250


In [50]:
%%time 

keys = extract_keywords_df(df[:100])

CPU times: user 50 s, sys: 391 ms, total: 50.4 s
Wall time: 49.5 s


In [51]:
# 데이터 병렬 처리
num_cores = 2 #mp.cpu_count()
keys2 = parallelize_dataframe(df[:100], extract_keywords_df, num_cores=num_cores)

KeyboardInterrupt: 

In [35]:
df3.keywords[0]

Unnamed: 0,Word,TF-IDF Score
93,사람,0.305852
4,가락시장,0.244682
99,새벽,0.214096
5,가명,0.214096
121,시장,0.214096
116,수산시장,0.183511
82,배추,0.183511
115,수산물,0.152926
102,생선,0.152926
97,상인,0.152926


In [20]:
print(df_with_keywords.head())

                 id topic                                               text  \
0  NIRW1900000001.1    사회  "대통령, 시장 방문만 하지 말고 실천해달라" 2008년의 마지막 새벽, 언론의 카...   
1  NIRW1900000001.4    사회  진성호 의원, 겨우 이 정도였나 오늘(31일) 한나라당 의원 총회에서 조선일보 출신...   
2  NIRW1900000001.5    사회  "소의 해, 정치인들 싸움 좀 그만하시죠" 지난해 12월 31일 대구국채보상기념운동...   
3  NIRW1900000001.6    사회  'MB악법 저지' 촛불 들고 새해 맞은 대전시민들 촛불의 물결이 전국을 뒤 덮었던 ...   
4  NIRW1900000001.7    사회  산(酸) 몰아내니 누이 좋고 매부 좋고 겨울철 우리의 입맛을 돋우는 먹을거리 가운데...   

                  keywords  
0  [것은, 대통령은, 말했다, 있는, 있다]  
1  [것은, 대통령은, 말했다, 있는, 있다]  
2  [것은, 대통령은, 말했다, 있는, 있다]  
3  [것은, 대통령은, 말했다, 있는, 있다]  
4  [것은, 대통령은, 말했다, 있는, 있다]  


In [11]:
df[:10]

Unnamed: 0,id,text
0,1,This is a test sentence number 1
1,2,This is a test sentence number 2
2,3,This is a test sentence number 3
3,4,This is a test sentence number 4
4,5,This is a test sentence number 5
5,6,This is a test sentence number 6
6,7,This is a test sentence number 7
7,8,This is a test sentence number 8
8,9,This is a test sentence number 9
9,10,This is a test sentence number 10


In [26]:
ray.shutdown()

[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: d764e70c0fb13e468307b277d9635acfa1e7eb6101000000 Worker ID: 3952da5b80deaaae97f62968d66174dda8643eda193deda265cefc9f Node ID: 2aec5a255ab54b6fe54a76ab62d86869d83abfc198f5fcbad9261074 Worker IP address: 203.250.229.132 Worker port: 34865 Worker PID: 8938 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.[32m [repeated 2x across cluster][0m
