In [10]:
# 검증 전용 (1단계): 배당 공시 데이터에서 stock_code, 공시일 추출
import pandas as pd

# 1️⃣ 데이터 로드
path = "/Users/gun/Desktop/미래에셋 AI 공모전/data/dividend_ml_ready.csv"
df_div = pd.read_csv(path, dtype={"stock_code": str, "rcept_no": str})
print(f"✅ 데이터 로드 완료: {len(df_div):,} rows")

# 2️⃣ 공시일(rcept_dt) 추출 → rcept_no 앞 8자리
df_div["rcept_dt"] = pd.to_datetime(df_div["rcept_no"].str[:8], format="%Y%m%d", errors="coerce")

# 3️⃣ 필요한 컬럼만 정제
df_base = df_div[["stock_code", "rcept_dt"]].dropna().drop_duplicates().reset_index(drop=True)

# 4️⃣ 확인
print(f"✅ 유효한 stock_code + rcept_dt 조합 수: {len(df_base):,}")
print(df_base.head())

# 5️⃣ 저장 (price_fetcher 이후 사용할 수 있도록)
save_path = "/Users/gun/Desktop/미래에셋 AI 공모전/data/base_for_price_check.csv"
df_base.to_csv(save_path, index=False, encoding="utf-8-sig")
print(f"📁 저장 완료: {save_path}")

✅ 데이터 로드 완료: 15,460 rows
✅ 유효한 stock_code + rcept_dt 조합 수: 15,460
  stock_code   rcept_dt
0     900290 2025-06-20
1     900290 2025-03-19
2     900290 2024-12-10
3     900290 2024-11-20
4     900290 2024-11-06
📁 저장 완료: /Users/gun/Desktop/미래에셋 AI 공모전/data/base_for_price_check.csv


In [14]:
# 전체 검증 파이프라인
import pandas as pd
from tqdm.auto import tqdm
from datetime import timedelta
import FinanceDataReader as fdr

# 1️⃣ 기준 테이블 로드 ─────────────────────────────────────
base_path = "/Users/gun/Desktop/미래에셋 AI 공모전/data/base_for_price_check.csv"
df_base = pd.read_csv(base_path, dtype={"stock_code": str})
df_base["rcept_dt"] = pd.to_datetime(df_base["rcept_dt"])
print(f"✅ 기준 데이터 로드: {len(df_base):,} rows")

# 2️⃣ 주가 수집 함수 ─────────────────────────────────────
def fetch_price_history(code: str, start: str, end: str) -> pd.DataFrame:
    """FDR에서 해당 종목코드의 주가 수집"""
    try:
        df = fdr.DataReader(code, start, end)
        df = df.reset_index()[["Date", "Close", "Volume"]]
        df["stock_code"] = code
        df = df.rename(columns={"Date": "date", "Close": "close", "Volume": "volume"})
        return df
    except:
        return pd.DataFrame()

# 3️⃣ 종목별 주가 수집 ─────────────────────────────────────
codes = df_base["stock_code"].unique()
price_all = []

print("📦 주가 수집 시작...")
for code in tqdm(codes):
    start_date = (df_base[df_base["stock_code"] == code]["rcept_dt"].min() - timedelta(days=30)).strftime("%Y-%m-%d")
    end_date   = (df_base[df_base["stock_code"] == code]["rcept_dt"].max() + timedelta(days=30)).strftime("%Y-%m-%d")
    df_price = fetch_price_history(code, start_date, end_date)
    price_all.append(df_price)

df_price_all = pd.concat(price_all, ignore_index=True)
df_price_all["date"] = pd.to_datetime(df_price_all["date"])
print(f"✅ 주가 수집 완료: {len(df_price_all):,} rows")

# 4️⃣ 병합을 위한 컬럼 정제 ───────────────────────────────
df_merge = df_base.merge(df_price_all, on="stock_code", how="left")
df_merge = df_merge[df_merge["date"].notna()]
print(f"🔗 병합 후 데이터 수: {len(df_merge):,}")

# 5️⃣ 윈도우 추출 함수 ─────────────────────────────────────
def select_trading_window(df_group: pd.DataFrame,
                          dt: pd.Timestamp,
                          window: int = 10) -> pd.DataFrame:
    dfg = df_group.sort_values("date").reset_index(drop=True)
    pos = dfg["date"].searchsorted(dt)
    start = max(pos - window, 0)
    end   = min(pos + window + 1, len(dfg))
    return dfg.iloc[start:end]

# 6️⃣ 종목별 공시일 ±10거래일 윈도우 확보 여부 검증 ─────
records = []
grouped = df_merge.groupby(["stock_code", "rcept_dt"])
print("🔍 그룹 수:", grouped.ngroups)

for (code, dt), grp in tqdm(grouped, total=grouped.ngroups, desc="Validating windows"):
    if grp.empty:
        n = 0
    else:
        win = select_trading_window(grp, dt, window=10)
        n = len(win)
    records.append({
        "stock_code": code,
        "rcept_dt":   dt,
        "n_days":     n
    })

df_check = pd.DataFrame(records)
print("✅ 검증 결과 생성:", df_check.shape)

# 7️⃣ 결과 확인 ───────────────────────────────────────────
dist = df_check["n_days"].value_counts().sort_index()
print("\n±10거래일 윈도우 확보 분포 (n_days):")
print(dist.to_string())

# 8️⃣ 부족 이벤트 출력 ───────────────────────────────────
missing = df_check[df_check["n_days"] < 21]
print(f"\n▶ 윈도우 부족 이벤트 수: {len(missing):,}")
if not missing.empty:
    print("\n-- 부족 예시 (최초 5건) --")
    print(missing.head().to_string(index=False))

# 9️⃣ 선택 저장 (필요 시)
save_path = "/Users/gun/Desktop/미래에셋 AI 공모전/data/window_check_result.csv"
df_check.to_csv(save_path, index=False, encoding="utf-8-sig")
print(f"📁 검증 결과 저장 완료: {save_path}")

✅ 기준 데이터 로드: 15,460 rows
📦 주가 수집 시작...


100%|██████████| 1682/1682 [05:56<00:00,  4.72it/s]


✅ 주가 수집 완료: 752,906 rows
🔗 병합 후 데이터 수: 7,638,780
🔍 그룹 수: 4022


Validating windows: 100%|██████████| 4022/4022 [00:00<00:00, 5901.53it/s]

✅ 검증 결과 생성: (4022, 3)

±10거래일 윈도우 확보 분포 (n_days):
n_days
11      92
12       1
13       1
16       1
19       1
20       4
21    3922

▶ 윈도우 부족 이벤트 수: 100

-- 부족 예시 (최초 5건) --
stock_code   rcept_dt  n_days
    100120 2013-02-08      11
    100130 2013-03-04      11
    100220 2013-02-27      11
    100250 2013-02-07      11
    100660 2013-02-28      11
📁 검증 결과 저장 완료: /Users/gun/Desktop/미래에셋 AI 공모전/data/window_check_result.csv





In [38]:
# ──────────────────────────────────────────────────────────────────────────────
# 02_price_fetching.ipynb — 전체 히스토리 한 번만 fetch & 이벤트별 슬라이스
# ──────────────────────────────────────────────────────────────────────────────

import os, time, warnings, threading
from pathlib import Path
from datetime import timedelta
import pandas as pd
import FinanceDataReader as fdr
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor

warnings.filterwarnings("ignore", category=UserWarning)

# ── 1. 설정
BASE       = "/Users/gun/Desktop/미래에셋 AI 공모전/data"
DIV_PATH   = os.path.join(BASE, "dividend_ml_ready.csv")
HIST_PATH  = os.path.join(BASE, "price_history.csv")
FAIL_PATH  = os.path.join(BASE, "failed_codes.csv")
CACHE_DIR  = Path(BASE) / "price_cache"
CACHE_DIR.mkdir(exist_ok=True)

WINDOW_DAYS = 90    # ±일수 (주말 포함)
MAX_RETRY   = 3
BACKOFF     = 2.5
N_THREADS   = 8

# ── 2. 이벤트 로드 & 상장 종목 필터
df_div = pd.read_csv(DIV_PATH, dtype={"stock_code":str,"rcept_no":str})
df_div["stock_code"] = df_div.stock_code.str.zfill(6)
df_div["rcept_dt"]   = pd.to_datetime(df_div.rcept_no.str[:8],
                                      format="%Y%m%d", errors="coerce")
df_base = (
    df_div[["stock_code","rcept_dt"]]
    .dropna()
    .drop_duplicates()
    .reset_index(drop=True)
)

try:
    live = set(fdr.StockListing("KRX")["Code"])
except:
    live = set(df_base.stock_code.unique())
df_base = df_base[df_base.stock_code.isin(live)].reset_index(drop=True)
print(f"✅ 이벤트: {len(df_base):,}건  |  종목 수: {df_base.stock_code.nunique():,}")

# ── 3. 종목별 전체 기간 fetch 함수
lock, failed = threading.Lock(), []

def fetch_code(code: str, start: str, end: str) -> pd.DataFrame | None:
    cache_file = CACHE_DIR / f"{code}.csv"
    # ① 캐시 방어: date 칼럼이 datetime 으로 잘 파싱되는지 확인
    if cache_file.exists():
        try:
            tmp = pd.read_csv(cache_file, parse_dates=["date"])
            if (tmp["date"].dtype.kind == "M"  # datetime64인지
                and tmp["date"].min() <= pd.to_datetime(start)
                and tmp["date"].max() >= pd.to_datetime(end)):
                return tmp
        except Exception:
            pass  # 파싱 실패하면 캐시 무시하고 재수집

    # ② 신규 fetch 시도 (KRX → plain code)
    delay = 1.0
    for attempt in range(1, MAX_RETRY+1):
        try:
            df = fdr.DataReader(f"KRX:{code}", start, end)
        except:
            try:
                df = fdr.DataReader(code, start, end)
            except Exception as e:
                if attempt == MAX_RETRY:
                    with lock:
                        failed.append(code)
                    return None
                time.sleep(delay)
                delay *= BACKOFF
                continue
        df = (
            df.reset_index()[["Date","Close","Volume"]]
              .rename(columns={"Date":"date","Close":"close","Volume":"volume"})
        )
        df["stock_code"] = code
        df.to_csv(cache_file, index=False)
        return df

    return None

# ── 4. 종목별 최소/최대 이벤트 일자에 맞춰 한 번만 fetch
ranges = (
    df_base
    .groupby("stock_code")["rcept_dt"]
    .agg(["min","max"])
    .reset_index()
)
ranges["start"] = (ranges["min"] - timedelta(days=WINDOW_DAYS)).dt.strftime("%Y-%m-%d")
ranges["end"]   = (ranges["max"] + timedelta(days=WINDOW_DAYS)).dt.strftime("%Y-%m-%d")

price_dfs = []
with ThreadPoolExecutor(max_workers=N_THREADS) as exe:
    futures = {
        exe.submit(fetch_code, row.stock_code, row.start, row.end): row.stock_code
        for _, row in ranges.iterrows()
    }
    for fut in tqdm(futures, total=len(futures), desc="fetching codes"):
        df_code = fut.result()
        if df_code is not None:
            price_dfs.append(df_code)

total_rows = sum(len(df) for df in price_dfs)
print(f"✅ 가격 데이터 fetch 완료: rows={total_rows:,}, 실패 종목={len(failed):,}")
pd.DataFrame({"failed_code": failed}).to_csv(FAIL_PATH, index=False)

# ── 5. full_price_history.csv 저장
df_full = pd.concat(price_dfs, ignore_index=True)
FULL_PATH = os.path.join(BASE, "full_price_history.csv")
df_full.to_csv(FULL_PATH, index=False, encoding="utf-8-sig")
print(f"✅ full_price_history.csv 저장 → {FULL_PATH}")

# ── 6. 이벤트별 ±WINDOW_DAYS 슬라이스
price_map = {
    code: grp.sort_values("date").reset_index(drop=True)
    for code, grp in df_full.groupby("stock_code")
}

hist_rows = []
for (code, dt), _ in tqdm(df_base.groupby(["stock_code","rcept_dt"]),
                          total=len(df_base), desc="slicing windows"):
    sub = price_map.get(code)
    if sub is None:
        continue

    pos = sub["date"].searchsorted(dt)
    if pos >= len(sub) or sub.loc[pos,"date"] < dt:
        fut = sub["date"][sub["date"] >= dt]
        if fut.empty:
            continue
        pos = sub["date"].searchsorted(fut.min())

    lo = max(pos - WINDOW_DAYS, 0)
    hi = min(pos + WINDOW_DAYS + 1, len(sub))
    window_df = sub.iloc[lo:hi].copy()

    # 충분한 길이(2*WINDOW_DAYS+1)인지 체크
    if len(window_df) < 2*WINDOW_DAYS + 1:
        continue

    window_df["rcept_dt"] = dt
    hist_rows.append(window_df)

# ── 7. price_history.csv 저장
if hist_rows:
    df_hist = pd.concat(hist_rows, ignore_index=True)
    df_hist = df_hist[["stock_code","rcept_dt","date","close","volume"]]
    df_hist.to_csv(HIST_PATH, index=False, encoding="utf-8-sig")
    print(f"📁 price_history.csv 저장 ({len(df_hist):,} rows)")
else:
    print("⚠️ 슬라이스된 데이터가 없습니다.")

print("\n🎉 전체 히스토리 캐시 & 슬라이스 완료")

✅ 이벤트: 15,460건  |  종목 수: 1,682


fetching codes: 100%|██████████| 1682/1682 [00:30<00:00, 54.56it/s] 


✅ 가격 데이터 fetch 완료: rows=3,296,298, 실패 종목=0
✅ full_price_history.csv 저장 → /Users/gun/Desktop/미래에셋 AI 공모전/data/full_price_history.csv


slicing windows: 100%|██████████| 15460/15460 [00:01<00:00, 10309.02it/s]


📁 price_history.csv 저장 (1,820,860 rows)

🎉 전체 히스토리 캐시 & 슬라이스 완료
