<a href="https://colab.research.google.com/github/seulmi0827/fininsight/blob/main/JACE/full_refine_sentence.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -q kss flashtext
!pip install -q pandarallel

In [None]:
!apt-get update
!apt-get install g++ openjdk-8-jdk -y
!pip install konlpy
!pip install mecab-python
!apt-get install curl -y
!bash <(curl -s https://raw.githubusercontent.com/konlpy/konlpy/master/scripts/mecab.sh)

# import

In [None]:
from typing import List
import pandas as pd
import re
from kss import split_sentences
from flashtext import KeywordProcessor
from pandarallel import pandarallel
from konlpy.tag import Mecab

## 전처리 함수

In [None]:
def expand_aspect_terms(aspect_terms: List[str]) -> List[str]:
    expanded = set()

    for term in aspect_terms:
        if '_' in term:
            clean_term = term.replace('_', ' ')
            expanded.add(clean_term) # 언더바를 띄어쓰기로 대체한 단어 추가
            expanded.add(clean_term.replace(" ", ""))  # 띄어쓰기를 제거한 단어 추가
        elif ' ' in term:
            expanded.add(term) # 띄어쓰기 된 원본 단어 추가
            clean_term = term.replace(' ', '')
            expanded.add(clean_term) # 띄어쓰기 제거된 단어 추가
        else:
            clean_term = term # 언더바도 아니고 띄어쓰기도 아닌 원래 단어
            expanded.add(clean_term)

    return list(expanded)


def extract_aspect_sentences(df, aspect_terms):
    pandarallel.initialize(progress_bar=True, verbose=1)

    expanded_terms = expand_aspect_terms(aspect_terms)

    keyword_processor = KeywordProcessor()
    for term in expanded_terms:
        keyword_processor.add_keyword(term)

    # 정규식 패턴 생성
    pattern_dict = {
        term: re.compile(rf'\b{re.escape(term)}\b') for term in expanded_terms
    }

    def process_content(content):
        results = []
        try:
            # 괄호 사이에 있는 내용 제거
            def remove_brackets(text):
                prev_text = ""
                current_text = text

                while prev_text != current_text:
                    prev_text = current_text
                    current_text = re.sub(r'\([^()]*\)', '', current_text)  # 소괄호
                    current_text = re.sub(r'\[[^\[\]]*\]', '', current_text)  # 대괄호
                    current_text = re.sub(r'\{[^\{\}]*\}', '', current_text)  # 중괄호
                    current_text = re.sub(r'\<[^\<\>]*\>', '', current_text)  # 꺾쇠괄호

                return current_text

            # 줄바꿈 기준으로 문장 분리 후 각 줄의 끝에 마침표 확인, 없으면 추가
            lines = content.split('\n')
            for i in range(len(lines)):
                lines[i] = lines[i].strip()
                if lines[i] and not lines[i].endswith('.') and not lines[i].endswith('!') and not lines[i].endswith('?'):
                    lines[i] = lines[i] + '.'


            # 줄바꿈 처리된 문장들을 다시 합침
            content = ' '.join(lines).replace('\r', ' ')

            # 괄호 사이에 있는 내용 제거
            content = remove_brackets(content)

            # 이메일 형식 제거
            content = re.sub(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', '', content)

            # 전화번호 형태 제거
            content = re.sub(r'(\d{2,3}[-\s]?\d{3,4}[-\s]?\d{4})', '', content)

            # 특수문자(물음표) : 의문문 또는 특수기호가 물음표로 잘 못 들어간 경우
            content = re.sub(r'\S\?\S', '', content)

            # 특수문자와 한글/영숫자/공백/일부 기호를 제외한 모든 문자 제거 (한자도 제거)
            content = re.sub(r'[^0-9a-zA-Z\s가-힣.,!%\'\"]', '', content)

            # 연속 공백 처리
            content = re.sub(r'\s+', ' ', content).strip()

            # KSS로 문장 분리
            # 문장 분리 후에는 아예 제거하는 방식으로 접근 (문장 분리 전에는 일부분만 제거하는 방식)
            sentences = split_sentences(content)

            for sentence in sentences:
                # '..' 또는 ',,.' 등이 포함된 문장 제거
                if '..' in sentence or '...' in sentence or ' ..' in sentence or ' ...' in sentence:
                    continue

                # 인터뷰 제거
                if "Q." in sentence:
                    continue

                # "기자" 패턴 체크 및 처리
                if ' 기자 ' in sentence:
                    # print(f"기자 패턴 발견: {sentence}")
                    reporter_idx = sentence.find(' 기자 ')
                    # print(f"기자 인덱스: {reporter_idx}")

                    # "기자" 앞에 2-3글자 이름이 있는지 확인
                    if reporter_idx >= 3:  # 최소 3글자(공백1+이름2) 이상 앞에 있어야 함
                        prev_char = sentence[reporter_idx-1] # 동
                        prev_two_chars = sentence[reporter_idx-2:reporter_idx] # 길동
                        prev_three_chars = sentence[reporter_idx-3:reporter_idx] # 홍길동
                        # print(f"이전 1글자: '{prev_char}', 이전 2글자: '{prev_two_chars}', 이전 3글자: '{prev_three_chars}'")

                        # 2글자(외자) 또는 3글자(이름) 패턴 확인
                        is_two_char_match = len(prev_two_chars) == 2 and re.match(r'^[가-힣]{2}$', prev_two_chars)
                        is_three_char_match = len(prev_three_chars) == 3 and re.match(r'^[가-힣]{3}$', prev_three_chars)
                        # print(f"2글자 매치: {is_two_char_match}, 3글자 매치: {is_three_char_match}")

                        if is_two_char_match or is_three_char_match:
                            # "기자 " 위치 이후 문장만 사용
                            parts = sentence.split(' 기자 ', 1)  # 최대 1번만 분할
                            # print(f"분할 결과: {parts}")
                            if len(parts) > 1:
                                old_sentence = sentence
                                sentence = parts[1].strip()
                                # print(f"변경 전: '{old_sentence}'")
                                # print(f"변경 후: '{sentence}'")

                                # 처리 후 문장이 비어있으면 건너뛰기
                                if not sentence:
                                    # print("비어있는 문장으로 건너뛰기")
                                    continue

                # 30자 이하 문장 제거
                if len(sentence) <= 30:
                    continue

                # 형태소 분석
                mecab = Mecab()
                pos_tagged = mecab.pos(sentence)

                if not pos_tagged:
                    continue

                # 자연스럽지 않은 문장 끝 필터링
                if (pos_tagged[-1][1].startswith('N') or   # 명사 품사
                    # 예: NNG(일반 명사): 학교, 사과, 의자
                    # 예: NNP(고유 명사): 서울, 한국, 태평양
                    # 예: NNB(의존 명사): 것, 데, 수, 바
                    pos_tagged[-1][1].startswith('J') or   # 조사 품사
                    # 예: JKS(주격 조사): ~이, ~가
                    # 예: JKO(목적격 조사): ~을, ~를
                    # 예: JKB(부사격 조사): ~에, ~에서
                    # 예: JKG(관형격 조사): ~의
                    pos_tagged[-1][1].startswith('MM') or  # 관형사 품사
                    # 예: MMD(지시 관형사): 이, 그, 저
                    # 예: MMN(수 관형사): 한, 두, 세
                    # 예: MMA(성상 관형사): 새, 헌, 첫
                    pos_tagged[-1][1].startswith('MA') or  # 부사 품사
                    # 예: MAG(일반 부사): 매우, 빨리, 천천히
                    # 예: MAJ(접속 부사): 그리고, 또한, 그러나
                    pos_tagged[-1][1] == 'SN' or          # 숫자 품사
                    # 예: SN(숫자): 1, 2, 3, 하나, 둘, 셋
                    (pos_tagged[-1][1] == 'EF')           # 종결 어미 품사
                    # 예: EF(종결 어미): ~다, ~요, ~네, ~죠, ~습니다
                    ):
                    continue

                # 공백+마침표 패턴 제거
                sentence = re.sub(r' \.', '.', sentence)

                # 문장 복사 (이후 조사 제거를 위해)
                processed_sentence = sentence

                # 조사 제거
                current_pos = len(processed_sentence)
                for i in range(len(pos_tagged)-1, -1, -1):
                    word, pos = pos_tagged[i]
                    if pos.startswith("J"):
                        word_pos = processed_sentence.rfind(word, 0, current_pos)
                        if word_pos != -1:
                            processed_sentence = processed_sentence[:word_pos] + processed_sentence[word_pos+len(word):]
                            current_pos = word_pos

                # 키워드 검색
                keywords_found = keyword_processor.extract_keywords(processed_sentence)
                if keywords_found:
                    for term, pattern in pattern_dict.items():
                        if pattern.search(processed_sentence):
                            results.append({'original_sentence': sentence, 'processed_sentence': processed_sentence, 'term': term})

        except Exception as e:
            print(f"오류 발생: {e}")

        return results

    # 병렬 처리
    test_df = df.head(2000)
    all_results = test_df['content'].parallel_apply(process_content)

    # 결과 합치기
    flattened_results = [item for sublist in all_results for item in sublist]
    return pd.DataFrame(flattened_results)

## term 통합

In [None]:
# term 가져오기
result_file1 = pd.read_csv("/content/drive/MyDrive/fin/키워드/사회_both_1.csv")
result_file2 = pd.read_csv("/content/drive/MyDrive/fin/키워드/사회_disagreement_re_gpt.csv")
merged_result = pd.concat([result_file1, result_file2], ignore_index=True)
merged_result = merged_result.rename(columns={'단어': 'term'})
result_terms = set(merged_result['term'].dropna().astype(str).tolist())

print(len(result_file1))
print(len(result_file2))
print(len(merged_result))
print(len(result_terms))


In [None]:
# 기사 데이터 불러오기
df_file1 = pd.read_csv('/content/drive/MyDrive/fin/뉴스 데이터/NewsResult_20250421-20250422.csv', encoding='utf-8')
df_file2 = pd.read_csv('/content/drive/MyDrive/fin/뉴스 데이터/NewsResult_20250122-20250123.csv', encoding='utf-8')
df_file3 = pd.read_csv('/content/drive/MyDrive/fin/뉴스 데이터/NewsResult_20241022-20241023.csv', encoding='utf-8')
df_file4 = pd.read_csv('/content/drive/MyDrive/fin/뉴스 데이터/NewsResult_20240722-20240723.csv', encoding='utf-8')
df_file5 = pd.read_csv('/content/drive/MyDrive/fin/뉴스 데이터/NewsResult_20240422-20240423.csv', encoding='utf-8')

merged_df = pd.concat([df_file1, df_file2, df_file3, df_file4, df_file5], ignore_index=True)
merged_df = merged_df.rename(columns={'본문': 'content'})
df = merged_df.copy()
df["content"] = df["content"].fillna("").astype(str)

In [None]:
print(f"로드된 컬럼 : {list(df.columns)}")
print(f"로드된 기사: {len(df)}개")
print()

print("문장 추출 시작...")
result_df = extract_aspect_sentences(df, result_terms)
print(f"매칭된 문장 : {len(result_df)}")
result_df[['original_sentence','term']]

## term별 문장수

In [None]:
term_counts = result_df.groupby('term').size().reset_index(name='sentence_count')
term_counts = term_counts.sort_values('sentence_count', ascending=False) # 정렬
print(f"토탈 카운트 : {len(term_counts)}")
print()
print(term_counts)

## 데이터 저장

In [None]:
# term 별 문장 5개 랜덤 추출
# random_sentences = result_df.groupby('term').apply(lambda x: x.sample(min(len(x), 5))).reset_index(drop=True)

# 문장 길이 컬럼 추가 후 긴 문장 5개 추출
random_sentences = result_df.groupby('term').apply(
    lambda x: x.assign(length=x['original_sentence'].str.len()).nlargest(min(len(x), 5), 'length')
).reset_index(drop=True)

# 필요한 컬럼만 선택 (전처리 문장(조사제거X)과 키워드)
result_sentences = random_sentences[['original_sentence', 'term']]

result_sentences.to_csv('/content/drive/MyDrive/fin/aspect_sentences/0423_final_aspect_sentences.csv', index=False, encoding='utf-8-sig')