# 12차시: [실습] Open DART API로 기업 공시정보 자동 수집

## 학습 목표
- Open DART API를 활용하여 특정 기업의 사업 보고서, 공시 목록 등 원하는 공시 정보를 파이썬으로 자동 수집
- 여러 기업의 공시 정보를 일괄 수집하는 함수 작성
- 공시 데이터 필터링, 정제, 저장 및 분석

## 학습 내용
1. API 키 설정
2. 여러 기업 공시 목록 수집
3. 공시 정보 필터링 (기간, 유형)
4. 사업보고서 목록 조회
5. 데이터 정제 및 저장 (CSV/Excel)
6. 공시 통계 분석 및 시각화

## 구분
실습

---
이번 차시에서는 Open DART API를 활용하여 실제 기업 공시 정보를 자동으로 수집하고 분석하는 실습을 진행합니다.

In [None]:
# 라이브러리 설치
!pip install python-dotenv -q

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import requests
import time

def setup_korean_font_colab(verbose=True):
    """Colab 한글 폰트 설정"""
    import os
    import matplotlib as mpl
    import matplotlib.font_manager as fm
    font_path = "/usr/share/fonts/truetype/nanum/NanumGothic.ttf"
    if not os.path.exists(font_path):
        !apt-get update -qq
        !apt-get install -y fonts-nanum -qq
        pass
    try:
        fm.fontManager.addfont(font_path)
    except:
        pass
    font_name = fm.FontProperties(fname=font_path).get_name()
    mpl.rcParams["font.family"] = font_name
    mpl.rcParams["axes.unicode_minus"] = False
    if verbose:
        print(f"Korean font ready: {font_name}")
    return font_name

setup_korean_font_colab()

---
## 1. API 키 설정

11차시와 동일하게 `.env` 파일에서 API 키를 로드합니다.

In [None]:
# API 키 로드 (Colab에서 .env 파일 업로드)
from google.colab import files
import os
from dotenv import load_dotenv

print("[.env 파일 업로드]")
print("=" * 60)
print("로컬에 저장된 .env 파일을 선택해주세요.")
print("(DART_API_KEY가 포함된 파일)")
print()

uploaded = files.upload()

# .env 파일 로드
load_dotenv('.env')

DART_API_KEY = os.getenv('DART_API_KEY')

# API 키 로드 확인
print("\n[API 키 로드 상태]")
print("=" * 60)
if DART_API_KEY:
    print(f"DART API Key: 설정완료 ({DART_API_KEY[:8]}...)")
else:
    print("DART API Key: 미설정")
    print("⚠️ API 키가 없으면 실습을 진행할 수 없습니다.")

---
## 2. 여러 기업 공시 목록 수집

여러 기업의 공시 정보를 일괄 수집하는 함수를 작성합니다.

### 주요 기업 고유번호 (corp_code)
| 기업명 | 고유번호 | 종목코드 |
|--------|----------|----------|
| 삼성전자 | 00126380 | 005930 |
| SK하이닉스 | 00164742 | 000660 |
| NAVER | 00140878 | 035420 |
| LG화학 | 00164779 | 051910 |

In [None]:
# 기업 고유번호 사전
corp_codes = {
    "00126380": "삼성전자",
    "00164742": "SK하이닉스",
    "00140878": "NAVER",
    "00164779": "LG화학"
}

print("[분석 대상 기업]")
print("=" * 60)
for code, name in corp_codes.items():
    print(f"  - {name}: {code}")

In [None]:
# 여러 기업 공시 수집 함수
def collect_disclosures(corp_codes_dict, start_date, end_date, page_count=100):
    """
    여러 기업의 공시 목록을 수집하는 함수
    
    Parameters:
    -----------
    corp_codes_dict : dict
        {고유번호: 기업명} 형식의 딕셔너리
    start_date : str
        시작일 (YYYYMMDD)
    end_date : str
        종료일 (YYYYMMDD)
    page_count : int
        페이지당 조회 건수 (기본값: 100)
    
    Returns:
    --------
    DataFrame : 모든 기업의 공시 정보를 합친 DataFrame
    """
    all_disclosures = []
    
    print(f"[공시 수집 기간: {start_date} ~ {end_date}]")
    print("=" * 60)
    
    url = "https://opendart.fss.or.kr/api/list.json"
    
    for corp_code, corp_name in corp_codes_dict.items():
        params = {
            "crtfc_key": DART_API_KEY,
            "corp_code": corp_code,
            "bgn_de": start_date,
            "end_de": end_date,
            "page_count": str(page_count)
        }
        
        try:
            response = requests.get(url, params=params)
            
            if response.status_code == 200:
                data = response.json()
                
                if data['status'] == '000':
                    disclosures = data['list']
                    print(f"  ✓ {corp_name}: {len(disclosures)}건 수집")
                    
                    # 기업명 추가
                    for item in disclosures:
                        item['corp_name'] = corp_name
                    
                    all_disclosures.extend(disclosures)
                else:
                    print(f"  ✗ {corp_name}: 오류 - {data.get('message', '')}")
            else:
                print(f"  ✗ {corp_name}: HTTP 오류 - {response.status_code}")
            
            # API 호출 제한 고려 (0.1초 대기)
            time.sleep(0.1)
            
        except Exception as e:
            print(f"  ✗ {corp_name}: 예외 발생 - {e}")
    
    # DataFrame으로 변환
    if all_disclosures:
        df = pd.DataFrame(all_disclosures)
        print(f"\n총 {len(df)}건의 공시 정보 수집 완료")
        return df
    else:
        print("\n수집된 공시 정보가 없습니다.")
        return pd.DataFrame()

In [None]:
# 최근 3개월 공시 정보 수집
end_date = datetime.now()
start_date = end_date - timedelta(days=90)

start_str = start_date.strftime('%Y%m%d')
end_str = end_date.strftime('%Y%m%d')

print(f"[최근 3개월 공시 정보 수집]")
print(f"기간: {start_date.strftime('%Y-%m-%d')} ~ {end_date.strftime('%Y-%m-%d')}")
print()

df_disclosures = collect_disclosures(corp_codes, start_str, end_str, page_count=100)

In [None]:
# 수집된 공시 정보 확인
if len(df_disclosures) > 0:
    print("\n[수집된 공시 정보 샘플]")
    print("=" * 60)
    
    # 주요 컬럼만 선택
    display_cols = ['rcept_dt', 'corp_name', 'report_nm', 'pblntf_ty', 'flr_nm']
    df_display = df_disclosures[display_cols].copy()
    df_display.columns = ['접수일자', '기업명', '보고서명', '공시유형', '제출인']
    
    print(f"총 {len(df_display)}건")
    print("\n처음 10건:")
    print(df_display.head(10).to_string(index=False))
else:
    print("수집된 데이터가 없습니다.")

---
## 3. 공시 정보 필터링 (기간, 유형)

수집한 공시 정보를 기간, 유형 등으로 필터링합니다.

In [None]:
# 공시 유형 코드 설명
print("[공시 유형 코드]")
print("=" * 60)

pblntf_types = {
    "A": "정기보고서 (사업/반기/분기보고서)",
    "B": "주요사항보고",
    "C": "발행공시",
    "D": "지분공시",
    "E": "기타공시",
    "F": "외부감사관련",
    "G": "펀드공시",
    "H": "자산유동화",
    "I": "거래소공시",
    "J": "공정위공시"
}

for code, desc in pblntf_types.items():
    print(f"  - {code}: {desc}")

In [None]:
# 공시 유형별 통계
if len(df_disclosures) > 0:
    print("\n[공시 유형별 통계]")
    print("=" * 60)
    
    type_counts = df_disclosures['pblntf_ty'].value_counts()
    
    for pblntf_ty, count in type_counts.items():
        type_name = pblntf_types.get(pblntf_ty, "알 수 없음")
        print(f"  {pblntf_ty} ({type_name}): {count}건")
    
    # 시각화
    plt.figure(figsize=(10, 6))
    type_counts.plot(kind='bar', color='steelblue', edgecolor='black')
    plt.title('공시 유형별 건수', fontsize=14, fontweight='bold')
    plt.xlabel('공시 유형', fontsize=12)
    plt.ylabel('건수', fontsize=12)
    plt.xticks(rotation=45, ha='right')
    plt.grid(axis='y', alpha=0.3)
    plt.tight_layout()
    plt.show()

In [None]:
# 정기보고서만 필터링
if len(df_disclosures) > 0:
    print("\n[정기보고서만 필터링]")
    print("=" * 60)
    
    df_annual = df_disclosures[df_disclosures['pblntf_ty'] == 'A'].copy()
    
    print(f"정기보고서: {len(df_annual)}건")
    
    if len(df_annual) > 0:
        print("\n정기보고서 목록:")
        df_annual_display = df_annual[['rcept_dt', 'corp_name', 'report_nm']].copy()
        df_annual_display.columns = ['접수일자', '기업명', '보고서명']
        print(df_annual_display.to_string(index=False))

In [None]:
# 기업별 공시 건수 분석
if len(df_disclosures) > 0:
    print("\n[기업별 공시 건수]")
    print("=" * 60)
    
    corp_counts = df_disclosures['corp_name'].value_counts()
    
    for corp_name, count in corp_counts.items():
        print(f"  {corp_name}: {count}건")
    
    # 시각화
    plt.figure(figsize=(10, 5))
    corp_counts.plot(kind='barh', color='navy', edgecolor='black')
    plt.title('기업별 공시 건수', fontsize=14, fontweight='bold')
    plt.xlabel('건수', fontsize=12)
    plt.ylabel('기업명', fontsize=12)
    plt.grid(axis='x', alpha=0.3)
    plt.tight_layout()
    plt.show()

---
## 4. 사업보고서 목록 조회

정기보고서 중에서도 사업보고서만 별도로 조회합니다.

In [None]:
# 사업보고서만 필터링 (report_nm에 "사업보고서" 포함)
if len(df_disclosures) > 0:
    print("[사업보고서 목록]")
    print("=" * 60)
    
    df_business = df_disclosures[
        (df_disclosures['pblntf_ty'] == 'A') & 
        (df_disclosures['report_nm'].str.contains('사업보고서', na=False))
    ].copy()
    
    print(f"사업보고서: {len(df_business)}건\n")
    
    if len(df_business) > 0:
        df_business_display = df_business[['rcept_dt', 'corp_name', 'report_nm']].copy()
        df_business_display.columns = ['접수일자', '기업명', '보고서명']
        df_business_display = df_business_display.sort_values('접수일자', ascending=False)
        print(df_business_display.to_string(index=False))
    else:
        print("최근 3개월 내 사업보고서가 없습니다.")
        print("(사업보고서는 보통 연 1회 제출되므로 기간을 늘려야 할 수 있습니다)")

In [None]:
# 연도별 사업보고서 조회 (더 긴 기간)
print("\n[최근 1년간 사업보고서 조회]")
print("=" * 60)

end_date_year = datetime.now()
start_date_year = end_date_year - timedelta(days=365)

start_str_year = start_date_year.strftime('%Y%m%d')
end_str_year = end_date_year.strftime('%Y%m%d')

df_disclosures_year = collect_disclosures(corp_codes, start_str_year, end_str_year, page_count=100)

if len(df_disclosures_year) > 0:
    df_business_year = df_disclosures_year[
        (df_disclosures_year['pblntf_ty'] == 'A') & 
        (df_disclosures_year['report_nm'].str.contains('사업보고서', na=False))
    ].copy()
    
    print(f"\n사업보고서: {len(df_business_year)}건\n")
    
    if len(df_business_year) > 0:
        df_business_year_display = df_business_year[['rcept_dt', 'corp_name', 'report_nm']].copy()
        df_business_year_display.columns = ['접수일자', '기업명', '보고서명']
        df_business_year_display = df_business_year_display.sort_values('접수일자', ascending=False)
        print(df_business_year_display.to_string(index=False))

---
## 5. 데이터 정제 및 저장 (CSV/Excel)

수집한 공시 정보를 정제하고 CSV/Excel 파일로 저장합니다.

In [None]:
# 데이터 정제
if len(df_disclosures) > 0:
    print("[데이터 정제]")
    print("=" * 60)
    
    # 필요한 컬럼만 선택
    df_clean = df_disclosures[[
        'rcept_dt', 'corp_name', 'stock_code', 'corp_code',
        'report_nm', 'pblntf_ty', 'flr_nm', 'rm'
    ]].copy()
    
    # 컬럼명 한글화
    df_clean.columns = [
        '접수일자', '기업명', '종목코드', '고유번호',
        '보고서명', '공시유형', '제출인', '비고'
    ]
    
    # 접수일자를 날짜 형식으로 변환
    df_clean['접수일자'] = pd.to_datetime(df_clean['접수일자'])
    
    # 공시유형 한글명 추가
    df_clean['공시유형명'] = df_clean['공시유형'].map(pblntf_types)
    
    # 정렬 (최신순)
    df_clean = df_clean.sort_values('접수일자', ascending=False).reset_index(drop=True)
    
    print(f"정제 완료: {len(df_clean)}건")
    print(f"\n컬럼: {df_clean.columns.tolist()}")
    print(f"\n샘플 데이터:")
    print(df_clean.head().to_string(index=False))

In [None]:
# CSV 파일로 저장
if len(df_disclosures) > 0:
    print("\n[CSV 파일 저장]")
    print("=" * 60)
    
    csv_filename = f"공시정보_{start_str}_{end_str}.csv"
    df_clean.to_csv(csv_filename, index=False, encoding='utf-8-sig')
    
    print(f"저장 완료: {csv_filename}")
    print(f"파일 크기: {os.path.getsize(csv_filename) / 1024:.2f} KB")

In [None]:
# Excel 파일로 저장
if len(df_disclosures) > 0:
    print("\n[Excel 파일 저장]")
    print("=" * 60)
    
    excel_filename = f"공시정보_{start_str}_{end_str}.xlsx"
    
    with pd.ExcelWriter(excel_filename, engine='openpyxl') as writer:
        # 전체 공시 정보
        df_clean.to_excel(writer, sheet_name='전체공시', index=False)
        
        # 정기보고서만
        if len(df_annual) > 0:
            df_annual_clean = df_clean[df_clean['공시유형'] == 'A'].copy()
            df_annual_clean.to_excel(writer, sheet_name='정기보고서', index=False)
        
        # 기업별 요약
        summary = df_clean.groupby('기업명').agg({
            '접수일자': 'count',
            '공시유형': lambda x: x.value_counts().to_dict()
        }).reset_index()
        summary.columns = ['기업명', '총건수', '유형별건수']
        summary.to_excel(writer, sheet_name='기업별요약', index=False)
    
    print(f"저장 완료: {excel_filename}")
    print(f"파일 크기: {os.path.getsize(excel_filename) / 1024:.2f} KB")

---
## 6. 공시 통계 분석 및 시각화

수집한 공시 정보를 분석하고 시각화합니다.

In [None]:
# 월별 공시 건수 분석
if len(df_disclosures) > 0:
    print("[월별 공시 건수 분석]")
    print("=" * 60)
    
    # 접수일자에서 연월 추출
    df_clean['연월'] = df_clean['접수일자'].dt.to_period('M')
    
    monthly_counts = df_clean.groupby('연월').size()
    
    print("\n월별 공시 건수:")
    for period, count in monthly_counts.items():
        print(f"  {period}: {count}건")
    
    # 시각화
    plt.figure(figsize=(12, 5))
    monthly_counts.plot(kind='line', marker='o', linewidth=2, markersize=8, color='navy')
    plt.title('월별 공시 건수 추이', fontsize=14, fontweight='bold')
    plt.xlabel('연월', fontsize=12)
    plt.ylabel('건수', fontsize=12)
    plt.grid(True, alpha=0.3)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

In [None]:
# 기업별 공시 유형 분포
if len(df_disclosures) > 0:
    print("\n[기업별 공시 유형 분포]")
    print("=" * 60)
    
    # 기업별, 유형별 건수
    corp_type_counts = pd.crosstab(df_clean['기업명'], df_clean['공시유형명'])
    
    print(corp_type_counts)
    
    # 시각화 (스택 바 차트)
    plt.figure(figsize=(10, 6))
    corp_type_counts.plot(kind='bar', stacked=True, figsize=(12, 6), colormap='Set3')
    plt.title('기업별 공시 유형 분포', fontsize=14, fontweight='bold')
    plt.xlabel('기업명', fontsize=12)
    plt.ylabel('건수', fontsize=12)
    plt.legend(title='공시 유형', bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.xticks(rotation=45, ha='right')
    plt.grid(axis='y', alpha=0.3)
    plt.tight_layout()
    plt.show()

In [None]:
# 최근 공시 Top 10
if len(df_disclosures) > 0:
    print("\n[최근 공시 Top 10]")
    print("=" * 60)
    
    top10 = df_clean.head(10)[['접수일자', '기업명', '보고서명', '공시유형명']].copy()
    print(top10.to_string(index=False))

---
## 학습 정리

### 1. 여러 기업 공시 수집
- `collect_disclosures()` 함수로 여러 기업의 공시 정보를 일괄 수집
- API 호출 제한을 고려한 대기 시간 설정 (`time.sleep()`)

### 2. 공시 정보 필터링
- 공시 유형별 필터링 (`pblntf_ty`)
- 기간별 필터링
- 보고서명으로 필터링 (사업보고서 등)

### 3. 데이터 정제
- 필요한 컬럼만 선택
- 컬럼명 한글화
- 날짜 형식 변환
- 정렬 및 인덱스 리셋

### 4. 데이터 저장
- CSV 파일 저장 (`to_csv()`)
- Excel 파일 저장 (`to_excel()`, 여러 시트)

### 5. 통계 분석 및 시각화
- 월별 공시 건수 추이
- 기업별 공시 유형 분포
- Matplotlib으로 시각화

### 핵심 함수
```python
collect_disclosures(corp_codes_dict, start_date, end_date, page_count)
```

---

### 다음 차시 예고
- 13차시: [실습] FRED API로 경제지표 수집
  - 미국 경제지표 자동 수집
  - 여러 지표 비교 분석