In [52]:
# !pip install dart-fss
!pip -q install -U dart-fss pandas tqdm tenacity

In [70]:
# !pip install -U dart-fss pandas openpyxl tqdm

import os, re, time
from dataclasses import dataclass
from typing import Optional, Dict
from dotenv import load_dotenv

import pandas as pd
from tqdm import tqdm
import dart_fss as dart

# =========================
# 0) Open DART API KEY 설정
# =========================
load_dotenv()
api_key = os.getenv("DART_API_KEY")
if api_key is not None:
    dart.set_api_key(api_key=api_key)
else:
    raise RuntimeError("DART_API_KEY 환경변수가 설정되지 않았습니다.")

SLEEP = 0.15  # API 호출 간격

# =========================
# 1) 대상 기업(표시명) 목록
# =========================
TARGET_NAMES = [
    "삼성물산","효성중공업","현대건설","HJ중공업","DL이앤씨",
    "GS건설","대우건설","HDC현대산업개발","아이에스동서","태영건설"
]

@dataclass
class CorpRef:
    corp_code: str
    corp_name: str
    stock_code: Optional[str] = None

# 고정 매핑으로 시작(초기값), 이후 앵커링 단계에서 교체/보정됨
corp_refs: Dict[str, CorpRef] = {
    "삼성물산": CorpRef("00126229", "삼성물산", "000830"),   # 이후 028260으로 교체 앵커링
    "효성중공업": CorpRef("01316245", "효성중공업", "298040"),
    "현대건설": CorpRef("00164478", "현대건설", "000720"),
    "HJ중공업": CorpRef("00633835", "HJ중공업", "097230"),
    "DL이앤씨": CorpRef("01524093", "DL이앤씨", "375500"),
    "GS건설": CorpRef("00120030", "GS건설", "006360"),
    "대우건설": CorpRef("00124540", "대우건설", "047040"),
    "HDC현대산업개발": CorpRef("01310269", "HDC현대산업개발", "294870"),
    "아이에스동서": CorpRef("00115977", "아이에스동서", "010780"),
    "태영건설": CorpRef("00535825", "태영건설", None),      # 이후 009410으로 교체 앵커링
}

# ==========================================
# 2) 보고서 코드 / 분기말 / 계정명 정규화 등
# ==========================================
REPRT = {"1Q":"11013","H1":"11012","3Q":"11014","ANNUAL":"11011"}
ORDER = ["1Q","H1","3Q","ANNUAL"]
Q_END = {"11013":"-03-31","11012":"-06-30","11014":"-09-30","11011":"-12-31"}

NOTE_TAIL = re.compile(r"\s*\([^)]*\)\s*\)*\s*$")
MULTISPACE = re.compile(r"\s+")
def clean_account_name(s: str) -> str:
    if not isinstance(s, str):
        return s
    s = s.strip()
    s = s.replace("수익 (매출액)", "매출액")
    s = NOTE_TAIL.sub("", s)
    s = MULTISPACE.sub(" ", s)
    s = s.replace("영업이익(손실)", "영업이익")
    s = s.replace("분기(중간)순이익", "분기순이익")
    s = s.replace("당기순이익(손실)", "당기순이익")
    return s

TARGET_ACCOUNTS = {
    "자산총계": ["자산총계", "총자산"],
    "부채총계": ["부채총계", "총부채"],
    "자본총계": [
        "자본총계", "총자본", "자본총계(지배+비지배)"
    ],
    # 손익(가능하면 보수적으로 우선순위)
    "매출액": ["매출액", "매출", "매출수익", "영업수익", "수익"],
    "영업이익": ["영업이익", "영업손익"],
    "당기순이익": ["분기순이익", "당기순이익", "반기순이익"],
}

def parse_amount(x):
    if x is None:
        return None
    if isinstance(x, (int, float)):
        return float(x)
    s = str(x).strip()
    if s in ("", "-", "None", "nan"):
        return None
    s = s.replace(",", "")
    if s.startswith("(") and s.endswith(")"):  # (1,234) → -1234
        s = "-" + s[1:-1]
    try:
        return float(s)
    except:
        return None

# =========================================================
# 3) 주식코드로 정확한 corp_code를 찾는 앵커링 함수
# =========================================================
def _resolve_by_stock(stock_code: str):
    """
    주식코드로 corp_list를 순회해서 정확한 corp_code/현재공식명칭을 찾음
    """
    cl = dart.get_corp_list()
    for c in cl:
        if getattr(c, "stock_code", None) == stock_code:
            return str(getattr(c, "corp_code")).zfill(8), getattr(c, "corp_name", "")
    raise ValueError(f"stock_code '{stock_code}' 회사를 찾지 못했습니다.")

# ---------------------------------------------------------
# 메인 표시명 중 삼성물산/태영건설을 올바른 엔트리로 교체 앵커링
#   - 삼성물산: 028260 (합병 후 신주)
#   - 태영건설: 009410
# ---------------------------------------------------------
PRIMARY_STOCK_ANCHOR = {
    "삼성물산": "028260",
    "태영건설": "009410",
}
for nm, sc in PRIMARY_STOCK_ANCHOR.items():
    try:
        cc, now_official = _resolve_by_stock(sc)
        corp_refs[nm] = CorpRef(cc, nm, sc)  # 표시명은 그대로 유지
        print(f"[anchor primary] {nm} -> corp_code={cc}, stock_code={sc}, now_official='{now_official}'")
    except Exception as e:
        print(f"[warn] anchor primary {nm}: {e}")

# ---------------------------------------------------------
# 레거시 명칭(라인리지)도 주식코드로 고정
#   - 대림산업: 000210
#   - 현대산업개발: 012630
# ---------------------------------------------------------
LEGACY_STOCK_ANCHOR = {
    "대림산업": "000210",
    "현대산업개발": "012630",
}
for legacy_name, sc in LEGACY_STOCK_ANCHOR.items():
    try:
        cc, now_official = _resolve_by_stock(sc)
        corp_refs[legacy_name] = CorpRef(cc, legacy_name, sc)
        print(f"[anchor legacy] {legacy_name} -> corp_code={cc}, stock_code={sc}, now_official='{now_official}'")
    except Exception as e:
        print(f"[warn] anchor legacy {legacy_name}: {e}")

# ==========================================
# 4) FS 고정 + sj_div 필터 + 계정 매칭 강화 버전
# ==========================================
def fetch_report_accounts_strict(corp_code: str, year: int, reprt_code: str, fs_div: str):
    """
    단일 보고서 로드 (fs_div 고정: 'CFS' 또는 'OFS')
    - 손익 항목은 IS/CIS에서만, 재무상태 항목은 BS에서만 매칭
    - 반환: 타겟 6계정 + report_date
    """
    def _call(cc, yy, rc, div):
        time.sleep(SLEEP)
        return dart.api.finance.fnltt_singl_acnt_all(
            corp_code=str(cc).zfill(8), bsns_year=str(yy), reprt_code=rc, fs_div=div
        )

    resp = _call(corp_code, year, reprt_code, fs_div)
    items = resp.get("list", []) if isinstance(resp, dict) else (resp or [])
    df = pd.DataFrame(items)
    if df.empty:
        return {k: None for k in TARGET_ACCOUNTS} | {"report_date": f"{year}{Q_END[reprt_code]}"}

    # 정규화
    df["account_nm_clean"] = df["account_nm"].astype(str).map(clean_account_name)
    if "sj_div" not in df.columns:
        df["sj_div"] = None
    amt_col = "thstrm_amount" if "thstrm_amount" in df.columns else None

    # 보고일
    if "thstrm_dt" in df.columns and pd.notna(df["thstrm_dt"]).any():
        rpt_dt = str(df["thstrm_dt"].dropna().iloc[0])
    else:
        rpt_dt = f"{year}{Q_END[reprt_code]}"

    # 필터 함수: IS/CIS/BS
    def pick_amount(cands, allowed_sj=("IS","CIS","BS")):
        sub = df[df["account_nm_clean"].isin(cands)]
        if allowed_sj is not None:
            sub = sub[sub["sj_div"].isin(allowed_sj)]
        if sub.empty or not amt_col:
            return None
        row = sub.sort_values("ord").iloc[0] if "ord" in sub.columns else sub.iloc[0]
        return parse_amount(row[amt_col])

    out = {}
    # 재무상태표: BS만
    out["자산총계"] = pick_amount(TARGET_ACCOUNTS["자산총계"], allowed_sj=("BS",))
    out["부채총계"] = pick_amount(TARGET_ACCOUNTS["부채총계"], allowed_sj=("BS",))
    out["자본총계"] = pick_amount(TARGET_ACCOUNTS["자본총계"], allowed_sj=("BS",))
    # 손익계정: IS/CIS만
    out["매출액"]   = pick_amount(TARGET_ACCOUNTS["매출액"],   allowed_sj=("IS","CIS"))
    out["영업이익"] = pick_amount(TARGET_ACCOUNTS["영업이익"], allowed_sj=("IS","CIS"))
    out["당기순이익"] = pick_amount(TARGET_ACCOUNTS["당기순이익"], allowed_sj=("IS","CIS"))

    out["report_date"] = rpt_dt
    return out

def _probe_fs_div_for_year(corp_code: str, year: int) -> str:
    """
    연도 내 대표 보고서(우선 ANNUAL, 없으면 3Q/H1/1Q 순)에서 CFS 가용 여부 확인.
    CFS 가 하나라도 있으면 CFS 고정, 아니면 OFS.
    """
    probe_order = [REPRT["ANNUAL"], REPRT["3Q"], REPRT["H1"], REPRT["1Q"]]
    for rc in probe_order:
        try:
            time.sleep(SLEEP)
            r = dart.api.finance.fnltt_singl_acnt_all(
                corp_code=str(corp_code).zfill(8), bsns_year=str(year), reprt_code=rc, fs_div="CFS"
            )
            lst = r.get("list", []) if isinstance(r, dict) else (r or [])
            if lst:
                return "CFS"
        except Exception:
            pass
    return "OFS"

def yearly_quarter_table_with_fs_lock(corp_name: str, corp_code: str, year: int) -> pd.DataFrame:
    """
    (중요) 연도별 FS 기준을 CFS/OFS 중 하나로 '고정'하여 1Q/H1/3Q/ANNUAL 전체를 조회.
    이후 누적→분기 변환(Q2, Q4 차분) 수행.
    """
    fs_fixed = _probe_fs_div_for_year(corp_code, year)

    raw, dates = {}, {}
    for tag in ORDER:
        try:
            rec = fetch_report_accounts_strict(corp_code, year, REPRT[tag], fs_div=fs_fixed)
        except Exception:
            rec = {k: None for k in TARGET_ACCOUNTS} | {"report_date": f"{year}{Q_END[REPRT[tag]]}"}
        raw[tag] = rec
        dates[tag] = rec.get("report_date")

    def d(a, b):
        return None if (a is None or b is None) else a - b

    # 손익(누적 → 분기)
    sales_q = {
        "Q1": raw["1Q"]["매출액"],
        "Q2": d(raw["H1"]["매출액"], raw["1Q"]["매출액"]),
        "Q3": d(raw["3Q"]["매출액"], raw["H1"]["매출액"]),
        "Q4": d(raw["ANNUAL"]["매출액"], raw["3Q"]["매출액"]),
    }
    op_q = {
        "Q1": raw["1Q"]["영업이익"],
        "Q2": d(raw["H1"]["영업이익"], raw["1Q"]["영업이익"]),
        "Q3": d(raw["3Q"]["영업이익"], raw["H1"]["영업이익"]),
        "Q4": d(raw["ANNUAL"]["영업이익"], raw["3Q"]["영업이익"]),
    }
    ni_q = {
        "Q1": raw["1Q"]["당기순이익"],
        "Q2": d(raw["H1"]["당기순이익"], raw["1Q"]["당기순이익"]),
        "Q3": d(raw["3Q"]["당기순이익"], raw["H1"]["당기순이익"]),
        "Q4": d(raw["ANNUAL"]["당기순이익"], raw["3Q"]["당기순이익"]),
    }

    # 재무상태표(분기말 잔액 그대로)
    assets_q = {"Q1": raw["1Q"]["자산총계"], "Q2": raw["H1"]["자산총계"], "Q3": raw["3Q"]["자산총계"], "Q4": raw["ANNUAL"]["자산총계"]}
    liab_q   = {"Q1": raw["1Q"]["부채총계"], "Q2": raw["H1"]["부채총계"], "Q3": raw["3Q"]["부채총계"], "Q4": raw["ANNUAL"]["부채총계"]}
    equity_q = {"Q1": raw["1Q"]["자본총계"], "Q2": raw["H1"]["자본총계"], "Q3": raw["3Q"]["자본총계"], "Q4": raw["ANNUAL"]["자본총계"]}
    date_q   = {"Q1": dates["1Q"], "Q2": dates["H1"], "Q3": dates["3Q"], "Q4": dates["ANNUAL"]}

    rows = []
    for q in ["Q1","Q2","Q3","Q4"]:
        rows.append({
            "corp_name": corp_name,
            "corp_code": str(corp_code).zfill(8),
            "year": year,
            "quarter": q,
            "report_date": date_q[q],
            "자산총계": assets_q[q],
            "부채총계": liab_q[q],
            "자본총계": equity_q[q],
            "매출액": sales_q[q],
            "영업이익": op_q[q],
            "분기순이익": ni_q[q],
        })
    return pd.DataFrame(rows)

# ==========================================
# 5) 라인리지 폴백 (DL/HDC만 적용) + FS고정 함수 사용
# ==========================================
LINEAGE_RULES = {
    "DL이앤씨": ("대림산업", 2020),         # 2020년까지 레거시 '대림산업' 사용
    "HDC현대산업개발": ("현대산업개발", 2017), # 2017년까지 레거시 '현대산업개발' 사용
}

def yearly_quarter_table_with_lineage(pretty_name: str, corp_code: str, year: int) -> pd.DataFrame:
    base = yearly_quarter_table_with_fs_lock(pretty_name, corp_code, year)  #  FS고정 버전 사용
    num_cols = ["자산총계","부채총계","자본총계","매출액","영업이익","분기순이익"]
    all_missing = base[num_cols].isna().all().all()

    # 레거시 폴백: 해당 연도 전체가 결측이고 룰에 걸리면 레거시 주식코드 앵커로 다시 조회
    if all_missing:
        rule = LINEAGE_RULES.get(pretty_name)
        if rule and year <= rule[1]:
            legacy_name = rule[0]
            if legacy_name not in corp_refs:
                raise RuntimeError(f"legacy '{legacy_name}' corp_refs 미정의")
            alt = yearly_quarter_table_with_fs_lock(legacy_name, corp_refs[legacy_name].corp_code, year)
            alt["corp_name"] = pretty_name
            alt["corp_code"] = str(corp_code).zfill(8)
            return alt

    return base

# ==========================================
# 6) 2014~2025 로드 → 2015~2025 저장
# ==========================================
START_YEAR_LOAD, END_YEAR = 2014, 2025
NUM_COLS = ["자산총계","부채총계","자본총계","매출액","영업이익","분기순이익"]

all_dfs = []
for nm in tqdm(TARGET_NAMES, desc="Companies"):
    ref = corp_refs[nm]
    for yy in range(START_YEAR_LOAD, END_YEAR+1):
        try:
            df_y = yearly_quarter_table_with_lineage(nm, ref.corp_code, yy)
            all_dfs.append(df_y)
        except Exception as e:
            print(f"[warn] {nm} {yy}: {e}")

result_df = pd.concat(all_dfs, ignore_index=True)
result_df = result_df[result_df["year"] >= 2015].copy()  # 2015~2025만 사용

COL_ORDER = ["corp_name","corp_code","year","quarter","report_date"] + NUM_COLS
result_df = result_df[COL_ORDER].sort_values(["corp_name","year","quarter"]).reset_index(drop=True)

# --- 2015년 1~3분기 및 2025년 3~4분기 행 제거 (마스크 분리) ---
mask_2015_q1_q3 = (result_df["year"] == 2015) & (result_df["quarter"].isin(["Q1", "Q2", "Q3"]))
mask_2025_q3_q4 = (result_df["year"] == 2025) & (result_df["quarter"].isin(["Q3", "Q4"]))
to_drop_mask = mask_2015_q1_q3 | mask_2025_q3_q4

# 제거 대상 행 확인 (선택적)
to_drop = result_df.loc[to_drop_mask, ["corp_name", "year", "quarter"]].copy()

# 실제 제거
result_df = result_df.loc[~to_drop_mask].reset_index(drop=True)

# 정렬 유지
result_df = result_df.sort_values(["corp_name", "year", "quarter"]).reset_index(drop=True)

# ==========================================
# 7) 저장 (CSV / XLSX)
# ==========================================
os.makedirs("./dart_out", exist_ok=True)
csv_path  = "./dart_out/건설10_11년로드_2015~2025_연결_분기재무_정규화.csv"
xlsx_path = "./dart_out/건설10_11년로드_2015~2025_연결_분기재무_정규화.xlsx"

result_df.to_csv(csv_path, index=False, encoding="utf-8-sig")
with pd.ExcelWriter(xlsx_path) as w:
    result_df.to_excel(w, sheet_name="quarterly", index=False)

print(result_df.shape, "Saved:", csv_path, xlsx_path)

# ==========================================
# 8) 결측 요약 및 샘플
# ==========================================
missing = (
    result_df
    .assign(_miss=result_df[NUM_COLS].isna().all(axis=1))
    .query("_miss == True")[["corp_name","year","quarter"]]
    .groupby(["corp_name","year"]).agg(missing_quarters=("quarter","unique")).reset_index()
    .sort_values(["corp_name","year"])
)
print("\n== 결측 요약 (2015~2025) ==")
try:
    display(missing.head(30))
except Exception:
    print(missing.head(30).to_string(index=False))

# 샘플
try:
    display(result_df.head(12))
except Exception:
    print(result_df.head(12).to_string(index=False))


[anchor primary] 삼성물산 -> corp_code=00149655, stock_code=028260, now_official='삼성물산'
[anchor primary] 태영건설 -> corp_code=00153861, stock_code=009410, now_official='태영건설'
[anchor legacy] 대림산업 -> corp_code=00109693, stock_code=000210, now_official='DL'
[anchor legacy] 현대산업개발 -> corp_code=00164636, stock_code=012630, now_official='HDC'


Companies: 100%|██████████| 10/10 [05:00<00:00, 30.10s/it]
  result_df = pd.concat(all_dfs, ignore_index=True)


(390, 11) Saved: ./dart_out/건설10_11년로드_2015~2025_연결_분기재무_정규화.csv ./dart_out/건설10_11년로드_2015~2025_연결_분기재무_정규화.xlsx

== 결측 요약 (2015~2025) ==


Unnamed: 0,corp_name,year,missing_quarters
0,HDC현대산업개발,2015,[Q4]
1,HDC현대산업개발,2016,"[Q1, Q2, Q3, Q4]"
2,HDC현대산업개발,2017,"[Q1, Q2]"
3,HDC현대산업개발,2018,[Q1]
4,효성중공업,2015,[Q4]
5,효성중공업,2016,"[Q1, Q2, Q3, Q4]"
6,효성중공업,2017,"[Q1, Q2, Q3, Q4]"
7,효성중공업,2018,[Q1]


Unnamed: 0,corp_name,corp_code,year,quarter,report_date,자산총계,부채총계,자본총계,매출액,영업이익,분기순이익
0,DL이앤씨,1524093,2015,Q4,2015-12-31,12064890000000.0,7259125000000.0,4805769000000.0,,,
1,DL이앤씨,1524093,2016,Q1,2016-03-31,12332080000000.0,7389715000000.0,4942363000000.0,2253709000000.0,90775870000.0,31037070000.0
2,DL이앤씨,1524093,2016,Q2,2016-06-30,12378280000000.0,7324959000000.0,5053324000000.0,310076900000.0,45397330000.0,88759550000.0
3,DL이앤씨,1524093,2016,Q3,2016-09-30,12185420000000.0,7042494000000.0,5142928000000.0,-106422300000.0,-5518794000.0,-10709090000.0
4,DL이앤씨,1524093,2016,Q4,2016-12-31,12391510000000.0,7246135000000.0,5145374000000.0,7396406000000.0,288733300000.0,184107800000.0
5,DL이앤씨,1524093,2017,Q1,2017-03-31,12812120000000.0,7563966000000.0,5248157000000.0,2511359000000.0,113983700000.0,149346100000.0
6,DL이앤씨,1524093,2017,Q2,2017-06-30,13206280000000.0,7818788000000.0,5387496000000.0,594928500000.0,29052860000.0,-44812440000.0
7,DL이앤씨,1524093,2017,Q3,2017-09-30,14137480000000.0,8398251000000.0,5739226000000.0,320920300000.0,54249170000.0,191409400000.0
8,DL이앤씨,1524093,2017,Q4,2017-12-31,13402450000000.0,7708148000000.0,5694306000000.0,8908328000000.0,348613100000.0,212055700000.0
9,DL이앤씨,1524093,2018,Q1,2018-03-31,13689850000000.0,8038344000000.0,5651507000000.0,2836063000000.0,248241700000.0,250513400000.0


In [None]:
# =========================
# 수기입력 데이터 병합
# =========================

# 수기입력 데이터 로드
manual_data = pd.read_csv('dart_out/수기입력_dart_결측치.csv')
print(f"수기입력 데이터 로드 완료: {manual_data.shape}")

print(f"병합 전 결측치 개수: {result_df.isnull().sum().sum()}")

# 공통 컬럼으로 병합 (corp_name, year, quarter 기준)
common_cols = ['corp_name', 'year', 'quarter']
financial_cols = ['자산총계', '부채총계', '자본총계', '매출액', '영업이익', '분기순이익']

result_df_updated = result_df.merge(
    manual_data[common_cols + financial_cols], 
    on=common_cols, 
    how='left', 
    suffixes=('', '_manual')
)

# 결측치가 있는 곳에 수기입력 데이터로 보완
for col in financial_cols:
    if f'{col}_manual' in result_df_updated.columns:
        result_df_updated[col] = result_df_updated[col].fillna(result_df_updated[f'{col}_manual'])
        result_df_updated.drop(f'{col}_manual', axis=1, inplace=True)

result_df = result_df_updated
print(f"병합 후 결측치 개수: {result_df.isnull().sum().sum()}")
print(f"최종 데이터 shape: {result_df.shape}")

수기입력 데이터 로드 완료:
Shape: (390, 12)
Columns: ['corp_name', 'corp_code', 'year', 'quarter', 'report_date', '자산총계', '부채총계', '자본총계', '매출액', '영업이익', '분기순이익', 'Unnamed: 11']

result_df 컬럼: ['corp_name', 'corp_code', 'year', 'quarter', 'report_date', '자산총계', '부채총계', '자본총계', '매출액', '영업이익', '분기순이익']
result_df shape: (390, 11)

result_df 미리보기:
  corp_name corp_code  year quarter report_date          자산총계          부채총계  \
0     DL이앤씨  01524093  2015      Q4  2015-12-31  1.206489e+13  7.259125e+12   
1     DL이앤씨  01524093  2016      Q1  2016-03-31  1.233208e+13  7.389715e+12   
2     DL이앤씨  01524093  2016      Q2  2016-06-30  1.237828e+13  7.324959e+12   
3     DL이앤씨  01524093  2016      Q3  2016-09-30  1.218542e+13  7.042494e+12   
4     DL이앤씨  01524093  2016      Q4  2016-12-31  1.239151e+13  7.246135e+12   

           자본총계           매출액          영업이익         분기순이익  
0  4.805769e+12           NaN           NaN           NaN  
1  4.942363e+12  2.253709e+12  9.077587e+10  3.103707e+10  
2  5.053324

In [None]:
# =========================
# DB 연결 및 저장 기능
# =========================

import sys

# DB 폴더의 .env 파일을 명시적으로 로드
current_dir = os.path.dirname(os.path.abspath('dart_data.ipynb'))
parent_dir = os.path.dirname(current_dir)
db_folder = os.path.join(parent_dir, 'DB')
env_file_path = os.path.join(db_folder, '.env')

print(f"환경변수 파일 경로: {env_file_path}")
if os.path.exists(env_file_path):
    from dotenv import load_dotenv
    load_dotenv(env_file_path)
    print(f".env 파일 로드 성공")
    print(f"DB_PASSWORD 설정됨: {'YES' if os.getenv('DB_PASSWORD') else 'NO'}")
else:
    print(f".env 파일을 찾을 수 없습니다: {env_file_path}")

# DB 연결을 위한 경로 추가 및 import
try:
    from database import DatabaseConnection
except ImportError:
    if db_folder not in sys.path:
        sys.path.insert(0, db_folder)
    try:
        from database import DatabaseConnection
    except ImportError:
        print("데이터베이스 모듈을 찾을 수 없습니다. CSV 파일로만 저장됩니다.")
        DatabaseConnection = None

def save_to_database_incremental(df):
    """
    DART 재무데이터를 MySQL 데이터베이스에 증분 저장 (기존 데이터 유지, 없는 것만 추가)
    """
    if DatabaseConnection is None:
        print("데이터베이스 모듈을 사용할 수 없습니다. CSV 파일로만 저장됩니다.")
        return False
        
    try:
        db = DatabaseConnection()
        if not db.connect():
            print("데이터베이스 연결 실패 - CSV 파일로만 저장됩니다.")
            return False
        
        cursor = db.connection.cursor()
        
        # 기존 데이터 확인
        cursor.execute("SELECT COUNT(*) FROM dart_data")
        existing_count = cursor.fetchone()[0]
        print(f"기존 DB 레코드 수: {existing_count}개")
        
        # 기존 데이터의 키 조합 가져오기 (corp_code, year, quarter)
        cursor.execute("""
            SELECT DISTINCT corp_code, year, quarter 
            FROM dart_data
        """)
        existing_keys = set(cursor.fetchall())
        print(f"기존 데이터 키 조합 수: {len(existing_keys)}개")
        
        # 데이터 변환 및 정제
        df_clean = df.copy()
        print(f"=== 신규 데이터 확인 ({len(df_clean)}개 레코드) ===")
        
        # report_date를 DATE 형식으로 변환
        df_clean['report_date'] = pd.to_datetime(df_clean['report_date']).dt.date
        
        # 재무 데이터를 적절한 숫자 형식으로 변환
        financial_columns = ['자산총계', '부채총계', '자본총계', '매출액', '영업이익', '분기순이익']
        for col in financial_columns:
            df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
        
        # 신규 데이터 필터링 (기존에 없는 것만)
        new_records = []
        duplicate_count = 0
        
        for _, row in df_clean.iterrows():
            key = (row['corp_code'], int(row['year']), row['quarter'])
            if key not in existing_keys:
                new_records.append(row)
            else:
                duplicate_count += 1
        
        print(f"중복 데이터 (건너뜀): {duplicate_count}개")
        print(f"신규 추가할 데이터: {len(new_records)}개")
        
        if not new_records:
            print("추가할 신규 데이터가 없습니다.")
            cursor.close()
            db.disconnect()
            return True
        
        # 신규 데이터를 DataFrame으로 변환
        df_new = pd.DataFrame(new_records)
        
        # MySQL 삽입 쿼리
        insert_query = """
        INSERT INTO dart_data (
            corp_name, corp_code, year, quarter, report_date,
            total_assets, total_liabilities, total_equity,
            revenue, operating_profit, quarterly_profit
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """
        
        # 배치 저장
        batch_size = 50
        total_batches = (len(df_new) + batch_size - 1) // batch_size
        
        print(f"=== 신규 데이터 저장 시작 ({len(df_new)}개 레코드) ===")
        
        for i in range(0, len(df_new), batch_size):
            batch_df = df_new.iloc[i:i + batch_size]
            batch_num = (i // batch_size) + 1
            
            print(f"배치 {batch_num}/{total_batches} 저장 중... ({len(batch_df)}개)")
            
            # 배치 데이터 준비 - NaN 값을 안전하게 처리
            batch_data = []
            for _, row in batch_df.iterrows():
                def safe_value(val):
                    """NaN, inf, 문자열 'nan' 등을 None으로 안전하게 변환"""
                    if pd.isna(val) or val is None:
                        return None
                    if isinstance(val, str) and val.lower() in ['nan', 'inf', '-inf', 'null']:
                        return None
                    try:
                        if pd.isna(pd.Series([val]).iloc[0]):
                            return None
                        return float(val) if val != '' else None
                    except:
                        return None
                
                batch_data.append((
                    row['corp_name'],
                    row['corp_code'],
                    int(row['year']),
                    row['quarter'],
                    row['report_date'],
                    safe_value(row['자산총계']),
                    safe_value(row['부채총계']),
                    safe_value(row['자본총계']),
                    safe_value(row['매출액']),
                    safe_value(row['영업이익']),
                    safe_value(row['분기순이익'])
                ))
            
            cursor.executemany(insert_query, batch_data)
            db.connection.commit()
            print(f"배치 {batch_num} 저장 완료")
            
            if i + batch_size < len(df_new):
                time.sleep(0.3)
        
        print(f"신규 {len(df_new)}개 레코드 추가 완료!")
        
        # 최종 결과 확인
        cursor.execute("SELECT COUNT(*) FROM dart_data")
        final_count = cursor.fetchone()[0]
        added_count = final_count - existing_count
        print(f"최종 DB 레코드 수: {final_count}개 (추가됨: {added_count}개)")
        
        cursor.close()
        db.disconnect()
        return True
        
    except Exception as e:
        print(f"데이터베이스 저장 중 오류 발생: {e}")
        import traceback
        traceback.print_exc()
        if 'cursor' in locals():
            cursor.close()
        if 'db' in locals():
            db.disconnect()
        return False

def check_database_results():
    """데이터베이스에 저장된 DART 데이터 확인"""
    if DatabaseConnection is None:
        print("데이터베이스 모듈을 사용할 수 없습니다.")
        return
    
    try:
        db = DatabaseConnection()
        if not db.connect():
            print("데이터베이스 연결 실패")
            return
        
        cursor = db.connection.cursor()
        
        # 총 레코드 수 확인
        cursor.execute("SELECT COUNT(*) FROM dart_data")
        total_count = cursor.fetchone()[0]
        print(f"총 저장된 레코드 수: {total_count}")
        
        # 기업별 레코드 수 확인
        cursor.execute("""
            SELECT corp_name, COUNT(*) as record_count 
            FROM dart_data 
            GROUP BY corp_name 
            ORDER BY record_count DESC
        """)
        corp_counts = cursor.fetchall()
        print(f"\n기업별 레코드 수:")
        for corp_name, count in corp_counts[:10]:
            print(f"  {corp_name}: {count}개")
        
        # 연도별 레코드 수 확인
        cursor.execute("""
            SELECT year, COUNT(*) as record_count 
            FROM dart_data 
            GROUP BY year 
            ORDER BY year DESC
        """)
        year_counts = cursor.fetchall()
        print(f"\n연도별 레코드 수:")
        for year, count in year_counts:
            print(f"  {year}년: {count}개")
        
        # 최근 데이터 샘플 확인
        cursor.execute("""
            SELECT corp_name, year, quarter, revenue, operating_profit 
            FROM dart_data 
            ORDER BY year DESC, quarter DESC 
            LIMIT 5
        """)
        recent_data = cursor.fetchall()
        print(f"\n최근 데이터 샘플:")
        for corp_name, year, quarter, revenue, operating_profit in recent_data:
            revenue_str = f"{revenue:,.0f}" if revenue else "N/A"
            profit_str = f"{operating_profit:,.0f}" if operating_profit else "N/A"
            print(f"  {corp_name} {year}년 {quarter}: 매출액 {revenue_str}, 영업이익 {profit_str}")
        
        cursor.close()
        db.disconnect()
        
    except Exception as e:
        print(f"데이터베이스 확인 중 오류: {e}")

def load_from_csv_and_save_to_db():
    """CSV 파일에서 데이터를 읽어와 데이터베이스에 증분 저장"""
    csv_files = [
        "./dart_out/건설10_11년로드_2015~2025_연결_분기재무_정규화.csv",
        "./dart_out/건설10_11년로드_2015~2024_연결_분기재무_정규화.csv"
    ]
    
    df_loaded = None
    for csv_file in csv_files:
        if os.path.exists(csv_file):
            try:
                df_loaded = pd.read_csv(csv_file, encoding='utf-8-sig')
                print(f"CSV 파일 로드 성공: {csv_file}")
                break
            except Exception as e:
                print(f"CSV 파일 로드 실패: {csv_file} - {e}")
    
    if df_loaded is None:
        print("사용 가능한 CSV 파일을 찾을 수 없습니다.")
        return False
    
    print(f"CSV 데이터 정보: {len(df_loaded)}개 레코드")
    
    # 데이터베이스에 증분 저장
    db_success = save_to_database_incremental(df_loaded)
    
    if db_success:
        print(f"\n=== 저장 완료! 결과 확인 ===")
        check_database_results()
        return True
    else:
        print(f"\n데이터베이스 저장 실패")
        return False

print("DB 연결 및 저장 함수 정의 완료")

환경변수 파일 경로: c:\Users\baesh\Desktop\IE_Project\DB\.env
.env 파일 로드 성공
DB_PASSWORD 설정됨: YES
DB 연결 및 증분 저장 함수 정의 완료


In [2]:
# =========================
# 데이터베이스 저장 실행
# =========================

# result_df가 이미 존재하는지 확인
if 'result_df' in globals() and not result_df.empty:
    print(f"수집된 데이터 확인:")
    print(f"  - 레코드 수: {len(result_df)}")
    print(f"  - 기업 수: {result_df['corp_name'].nunique()}")
    print(f"  - 연도 범위: {result_df['year'].min()} ~ {result_df['year'].max()}")
    
    # 데이터베이스에 증분 저장
    db_success = save_to_database_incremental(result_df)
    
    if db_success:
        print(f"\n=== 저장 완료! 결과 확인 ===")
        check_database_results()
    else:
        print(f"\n데이터베이스 저장 실패 - CSV에서 로드 시도")
        load_from_csv_and_save_to_db()
        
else:
    print("result_df가 존재하지 않습니다. CSV에서 로드하여 저장을 시도합니다.")
    load_from_csv_and_save_to_db()

result_df가 존재하지 않습니다. CSV에서 로드하여 저장을 시도합니다.


NameError: name 'load_from_csv_and_save_to_db' is not defined