# 모듈 가져오기

In [2]:
import dart_fss as dart
import zipfile
import io
import os
import xml.etree.ElementTree as ET
from dart_fss.utils.request import request
from bs4 import BeautifulSoup, NavigableString
import re
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, T5ForConditionalGeneration

# 함수 정의

## 'Open DART 사용'에서 쓰이는 함수

In [28]:
# 파일 다운로드 함수
def download_zip_file(rcept_no, api_key, save_dir="."):
    url = f"https://opendart.fss.or.kr/api/document.xml?crtfc_key={api_key}&rcept_no={rcept_no}"
    zip_file_path = os.path.join(save_dir, f"{rcept_no}.zip")

    try:
        response = request.get(url, stream=True)
        if response.status_code == 200:
            os.makedirs(save_dir, exist_ok=True)
            with open(zip_file_path, 'wb') as f:
                f.write(response.content)
            print(f"ZIP 파일이 {zip_file_path}에 성공적으로 저장되었습니다.")
            return zip_file_path
        else:
            print(f"다운로드 실패. 상태 코드: {response.status_code}")
            return None
    except request.RequestException as e:
        print(f"다운로드 중 오류 발생: {e}")
        return None

# ZIP 파일에서 XML 파일 추출 함수
def extract_and_clean_xml(zip_file_path, save_dir="."):
    try:
        with zipfile.ZipFile(zip_file_path) as zf:
            for file_name in zf.namelist():
                if file_name.endswith(".xml"):
                    with zf.open(file_name) as xml_file:
                        xml_content = xml_file.read().decode('utf-8', errors='replace')
                        clean_content = clean_xml_content(xml_content)
                        html_file_path = os.path.join(save_dir, f"{os.path.splitext(file_name)[0]}_clean.html")
                        with open(html_file_path, 'w', encoding='utf-8') as html_file:
                            html_file.write(clean_content)
                        print(f"{file_name}이(가) {html_file_path}에 저장되었습니다.")
    except zipfile.BadZipFile as e:
        print(f"ZIP 파일 오류: {e}")

# XML 콘텐츠 클리닝 함수
def clean_xml_content(xml_content):
    soup = BeautifulSoup(xml_content, 'xml')
    for tag in soup.find_all(True):
        tag.attrs = {}
    body = soup.find(lambda tag: tag.name and tag.name.lower() == 'body')
    return body.prettify() if body else ""

# HTML 파일 전처리 함수
def preprocess_html_for_model(file_path, max_length=1024):
    with open(file_path, 'r', encoding='utf-8') as file:
        html_content = file.read()

    soup = BeautifulSoup(html_content, 'html.parser')
    extracted_text = []

    def recursive_extract(element, indent_level=0):
        table_tags = {'table', 'tr', 'td', 'th', 'te', 'tu'}
        if element.name:
            if element.name in table_tags:
                cell_text = element.get_text(separator=' ', strip=True)
                if cell_text and not re.fullmatch(r'[^\w]+', cell_text):
                    extracted_text.append(f"<{element.name}>")
                    for child in element.children:
                        recursive_extract(child, indent_level)
                    extracted_text.append(f"</{element.name}>")
            else:
                if element.get_text(strip=True):
                    indent = '  ' * indent_level
                    children_text = []
                    for child in element.children:
                        if isinstance(child, NavigableString):
                            text = child.strip()
                            if text:
                                children_text.append(text)
                        else:
                            recursive_extract(child, indent_level + 1)
                    if children_text:
                        extracted_text.append(indent + ' | '.join(children_text))

        elif isinstance(element, NavigableString):
            text = element.strip()
            if text and not re.fullmatch(r'[^\w]+', text):
                extracted_text.append(text)

    body = soup.find('body')
    if body:
        recursive_extract(body)

    final_text = '\n'.join(extracted_text)
    final_text = re.sub(r'\n+', '\n', final_text).strip()

    tokens = final_text.split()
    if len(tokens) > max_length:
        tokens = tokens[:max_length]

    return ' '.join(tokens)

# 테이블 태그를 제거하면서 구조 유지
def remove_table_tags_but_keep_structure(text):
    text = re.sub(r'</tr>', '\n', text, flags=re.IGNORECASE)
    text = re.sub(r'<(td|th)>', '| ', text, flags=re.IGNORECASE)
    text = re.sub(r'</(td|th)>', ' ', text, flags=re.IGNORECASE)
    text = re.sub(r'<[^>]+>', '', text)
    text = re.sub(r'\|+', '|', text)
    text = re.sub(r'\|\s*\n', '\n', text)
    return text.strip()


def search_company():
    # 회사명 입력
    company_name = input("회사명을 입력하세요: ").strip()

    # 회사명으로 기업 검색
    search_results = corp_list.find_by_corp_name(company_name, exactly=False)
    
    # 검색 결과 확인
    if not search_results:
        print("해당하는 회사명이 없습니다. 셀을 재실행 해주세요.")
        return None, None

    # 검색 결과 출력
    print("\n검색된 회사 목록:")
    for idx, company in enumerate(search_results):
        print(f"{idx + 1}. 회사명: {company.corp_name}, 기업코드: {company.corp_code}")
    
    # 사용자로부터 회사 선택
    while True:
        try:
            choice = int(input("\n원하는 회사 번호를 선택하세요: "))
            if 1 <= choice <= len(search_results):
                selected_company = search_results[choice - 1]
                print(f"\n선택된 회사: {selected_company.corp_name}, 기업코드: {selected_company.corp_code}")
                return selected_company.corp_code, selected_company.corp_name
            else:
                print(f"1부터 {len(search_results)} 사이의 번호를 입력하세요.")
        except ValueError:
            print("숫자를 입력해주세요.")
        except IndexError:
            print("유효한 번호를 입력해주세요.")

# 검색 기간 입력 함수
def get_search_period():
    while True:
        start_date = input("검색 시작일을 입력하세요 (예: 20230101): ").strip()
        end_date = input("검색 종료일을 입력하세요 (예: 20231231): ").strip()

        if len(start_date) == 8 and len(end_date) == 8 and start_date.isdigit() and end_date.isdigit():
            return start_date, end_date
        else:
            print("날짜 형식이 올바르지 않습니다. YYYYMMDD 형식으로 입력해주세요.")

## '보고서 요약'에서 쓰이는 함수

In [4]:
# 요약 생성 함수
def summarize_text(model_name, model_class, input_text):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = model_class.from_pretrained(model_name)

    # GPU 설정 (GPU가 없으면 CPU 사용)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # 입력 텍스트 토큰화
    inputs = tokenizer(input_text, return_tensors="pt", max_length=512, truncation=True).to(device)

    # 요약 생성
    with torch.no_grad():
        summary_ids = model.generate(inputs.input_ids, max_length=150, num_beams=4, early_stopping=True)

    # 요약문 디코딩
    return tokenizer.decode(summary_ids[0], skip_special_tokens=True)
    
# | 기호를 줄바꿈으로 변환하는 함수
def replace_pipes_with_newlines(text):
    # Replace '|' with newline and clean up unnecessary spaces
    text = re.sub(r'\s*\|\s*', '\n', text)
    # Replace multiple newlines with a single newline
    text = re.sub(r'\n+', '\n', text).strip()
    return text

# 통합 요약 생성 함수
def generate_combined_summary(rcept_no, save_dir, input_text):
    lcw99_summary = summarize_text("lcw99/t5-base-korean-text-summary", AutoModelForSeq2SeqLM, input_text)
    kot5_summary = summarize_text("psyche/KoT5-summarization", T5ForConditionalGeneration, input_text)

    # KoT5 요약에서 | 기호를 줄바꿈으로 변환
    kot5_summary_with_newlines = replace_pipes_with_newlines(kot5_summary)

    # 통합 요약 생성
    combined_summary = f"***메인 요약***:\n{lcw99_summary}\n\n" \
                       f"***보조 요약***:\n{kot5_summary_with_newlines}"
    
    # 결과를 파일로 저장
    output_file_path = os.path.join(save_dir, f"{rcept_no}_combined_summary.txt")
    with open(output_file_path, 'w', encoding='utf-8') as f:
        f.write(combined_summary)

    print(f"통합 요약 결과가 {output_file_path}에 저장되었습니다.")

# 전체 워크플로 함수
def process_reports(all_reports, save_dir, api_key):
    for report in all_reports:
        rcept_no = report.rcept_no
        input_file_path = os.path.join(save_dir, f"{rcept_no}_processed_text_for_model.txt")

        if os.path.exists(input_file_path):
            with open(input_file_path, 'r', encoding='utf-8') as f:
                input_text = f.read()
            generate_combined_summary(rcept_no, save_dir, input_text)
        else:
            print(f"{rcept_no}: 입력 파일이 존재하지 않습니다.")

# 코드 실행

## Open DART 사용

In [5]:
# Open DART API KEY 설정
api_key = 'your-API-KEY'

dart.set_api_key(api_key=api_key)

# DART 에 공시된 회사 리스트 불러오기
corp_list = dart.get_corp_list()

                                   

In [19]:
# 기업 정보 조회
company_name = input("회사명을 입력하세요: ")
start_date = input("검색 시작일을 입력하세요 (예: 20230101): ")
end_date = input("검색 종료일을 입력하세요 (예: 20231231): ")

# 입력된 회사명으로 기업 검색
try:
    company = corp_list.find_by_corp_name(company_name, exactly=True)[0]
    print(f"{company_name}의 기업코드: {company.corp_code}")
except IndexError:
    print("해당하는 회사명이 없습니다. 정확한 이름을 입력해주세요.")

회사명을 입력하세요:  NAVER
검색 시작일을 입력하세요 (예: 20230101):  20200101
검색 종료일을 입력하세요 (예: 20231231):  


NAVER의 기업코드: 00266961


In [29]:
corp_code, corp_name = search_company()
if corp_code:
    start_date, end_date = get_search_period()
    print(f"\n회사명: {corp_name}, 기업코드: {corp_code}")
    print(f"검색 기간: {start_date} ~ {end_date}")

회사명을 입력하세요:  씨제이



검색된 회사 목록:
1. 회사명: 씨제이골든빌리지, 기업코드: 00233796
2. 회사명: 씨제이프론티어전환형펀드, 기업코드: 00294849
3. 회사명: 씨제이코퍼레이션, 기업코드: 00259110
4. 회사명: 씨제이엔터테인먼트, 기업코드: 00576309
5. 회사명: 씨제이미디어, 기업코드: 00841452
6. 회사명: 네오디더블유씨제이차, 기업코드: 00636319
7. 회사명: 씨제이헬로비전중앙방송, 기업코드: 00410252
8. 회사명: 씨제이헬로비전해운대기장방송, 기업코드: 00504409
9. 회사명: 씨제이스틸, 기업코드: 00875954
10. 회사명: 씨제이엠키친, 기업코드: 00787109
11. 회사명: 씨제이투, 기업코드: 00923853
12. 회사명: 엔씨제이차, 기업코드: 00583150
13. 회사명: 씨제이시너빌, 기업코드: 00676131
14. 회사명: 씨제이헬로비전금정방송, 기업코드: 00618872
15. 회사명: 씨제이헬로비전영남방송, 기업코드: 00367570
16. 회사명: 씨제이티브이엔, 기업코드: 00520230
17. 회사명: 씨제이헬로비전드림씨티방송, 기업코드: 00246736
18. 회사명: 씨제이헬로비전충남방송, 기업코드: 00248947
19. 회사명: 씨제이인터넷, 기업코드: 00248691
20. 회사명: 씨제이에듀케이션즈, 기업코드: 00906403
21. 회사명: 씨제이지엘에스, 기업코드: 00206747
22. 회사명: 씨제이이엔지, 기업코드: 00890236
23. 회사명: 씨제이헬로비전영동방송, 기업코드: 00356255
24. 회사명: 코에프씨케이디비씨제이케이엘프런티어챔프2010의1호사모투자전문회사, 기업코드: 00870074
25. 회사명: 씨제이엔시티, 기업코드: 00559135
26. 회사명: 씨제이텍, 기업코드: 00518165
27. 회사명: 케이엘씨제이차유동화전문유한회사, 기업코드: 00829353
28. 회사명: 씨제이헬로비전신라방송, 기업코드: 00606266
2


원하는 회사 번호를 선택하세요:  124



선택된 회사: 케이에스에이치씨제이차, 기업코드: 01861499


검색 시작일을 입력하세요 (예: 20230101):  20200101
검색 종료일을 입력하세요 (예: 20231231):  20241210



회사명: 케이에스에이치씨제이차, 기업코드: 01861499
검색 기간: 20200101 ~ 20241210


In [30]:
# 공시유형이 정기공시 또는 주요사항보고 보고서 전부 가져오기
all_reports = []
page_no = 1

while True:
    reports = company.search_filings(bgn_de=start_date, end_de=end_date, pblntf_ty=['B'], last_reprt_at='Y', page_no=page_no)
    
    if not reports:
        break
    
    all_reports.extend(reports.report_list)  # 현재 페이지의 보고서를 리스트에 추가
    
    # 마지막 페이지면 종료
    if page_no >= reports.total_page:
        break
    
    page_no += 1

# 전체 보고서 개수 출력
final_report_count = len(all_reports)

print(f"보고서 수: {final_report_count}개")

# 최종 보고서 목록 출력
for idx, report in enumerate(all_reports):
    print(f"{idx + 1}. 접수번호: {report.rcept_no}, 보고서명: {report.report_nm}")


보고서 수: 22개
1. 접수번호: 20240930000001, 보고서명: 주요사항보고서(자기주식취득결정)
2. 접수번호: 20240701000614, 보고서명: 주요사항보고서(자기주식처분결정)
3. 접수번호: 20240326000771, 보고서명: 주요사항보고서(자기주식처분결정)
4. 접수번호: 20240102000389, 보고서명: 주요사항보고서(자기주식처분결정)
5. 접수번호: 20230703000391, 보고서명: 주요사항보고서(자기주식처분결정)
6. 접수번호: 20230403002421, 보고서명: 주요사항보고서(자기주식처분결정)
7. 접수번호: 20230331002921, 보고서명: [기재정정]주요사항보고서(자기주식처분결정)
8. 접수번호: 20230302000356, 보고서명: 주요사항보고서(자기주식처분결정)
9. 접수번호: 20230102000286, 보고서명: 주요사항보고서(자기주식처분결정)
10. 접수번호: 20220701000458, 보고서명: 주요사항보고서(자기주식처분결정)
11. 접수번호: 20220401002323, 보고서명: 주요사항보고서(자기주식처분결정)
12. 접수번호: 20220302000584, 보고서명: 주요사항보고서(자기주식처분결정)
13. 접수번호: 20220103000274, 보고서명: 주요사항보고서(자기주식처분결정)
14. 접수번호: 20210810000071, 보고서명: 주요사항보고서(자기주식처분결정)
15. 접수번호: 20210721000478, 보고서명: 주요사항보고서(회사합병결정)
16. 접수번호: 20210701000285, 보고서명: 주요사항보고서(자기주식처분결정)
17. 접수번호: 20210510000723, 보고서명: 주요사항보고서(자기주식처분결정)
18. 접수번호: 20210316000830, 보고서명: 주요사항보고서(자기주식처분결정)
19. 접수번호: 20210302003572, 보고서명: 주요사항보고서(자기주식처분결정)
20. 접수번호: 20210128000003, 보고서명: 주요사항보고서(자기주식

In [31]:
# 저장할 디렉토리 설정
save_dir = "./download"

for report in all_reports:
    rcept_no = report.rcept_no
    zip_file_path = download_zip_file(rcept_no, api_key, save_dir)
    
    if zip_file_path:
        try:
            # ZIP 파일이 유효한지 확인
            with zipfile.ZipFile(zip_file_path, 'r') as zf:
                extract_and_clean_xml(zip_file_path, save_dir)
        except zipfile.BadZipFile:
            print(f"{rcept_no}: ZIP 파일이 손상되었거나 올바르지 않습니다. 건너뜁니다.")
            continue
        
        # 클리닝된 HTML 파일 경로 생성
        html_file_path = os.path.join(save_dir, f"{rcept_no}_clean.html")

        # HTML 파일 존재 여부 확인
        if os.path.exists(html_file_path):
            # Step 2: HTML 파일 전처리 (메모리에서 처리)
            processed_text = preprocess_html_for_model(html_file_path)
        
            # Step 3: 테이블 태그 후처리 (메모리에서 처리)
            final_processed_content = remove_table_tags_but_keep_structure(processed_text)
        
            # 최종 결과를 파일로 저장
            output_file_path = os.path.join(save_dir, f"{rcept_no}_processed_text_for_model.txt")
            with open(output_file_path, 'w', encoding='utf-8') as f:
                f.write(final_processed_content)
        
            print(f"최종 전처리된 텍스트가 {output_file_path}에 저장되었습니다.\n")
        else:
            print(f"{rcept_no}: 클리닝된 HTML 파일이 존재하지 않습니다. 건너뜁니다.\n")
    else:
        print(f"{rcept_no}: ZIP 파일 다운로드에 실패했습니다.\n")

ZIP 파일이 ./download\20240930000001.zip에 성공적으로 저장되었습니다.
20240930000001.xml이(가) ./download\20240930000001_clean.html에 저장되었습니다.
최종 전처리된 텍스트가 ./download\20240930000001_processed_text_for_model.txt에 저장되었습니다.

ZIP 파일이 ./download\20240701000614.zip에 성공적으로 저장되었습니다.
20240701000614.xml이(가) ./download\20240701000614_clean.html에 저장되었습니다.
최종 전처리된 텍스트가 ./download\20240701000614_processed_text_for_model.txt에 저장되었습니다.

ZIP 파일이 ./download\20240326000771.zip에 성공적으로 저장되었습니다.
20240326000771.xml이(가) ./download\20240326000771_clean.html에 저장되었습니다.
최종 전처리된 텍스트가 ./download\20240326000771_processed_text_for_model.txt에 저장되었습니다.

ZIP 파일이 ./download\20240102000389.zip에 성공적으로 저장되었습니다.
20240102000389.xml이(가) ./download\20240102000389_clean.html에 저장되었습니다.
최종 전처리된 텍스트가 ./download\20240102000389_processed_text_for_model.txt에 저장되었습니다.

ZIP 파일이 ./download\20230703000391.zip에 성공적으로 저장되었습니다.
20230703000391.xml이(가) ./download\20230703000391_clean.html에 저장되었습니다.
최종 전처리된 텍스트가 ./download\20230703000391_processed_text_for_model.

## 보고서 요약

In [32]:
# 모든 보고서에 대해 처리 실행
process_reports(all_reports, save_dir, api_key)

통합 요약 결과가 ./download\20240930000001_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20240701000614_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20240326000771_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20240102000389_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20230703000391_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20230403002421_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20230331002921_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20230302000356_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20230102000286_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20220701000458_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20220401002323_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20220302000584_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20220103000274_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20210810000071_combined_summary.txt에 저장되었습니다.
통합 요약 결과가 ./download\20210721000478_combined_summary.txt에 저장되었