## Step 0-1. 공통 환경 설정 & KIS 토큰 유틸

이 단계에서는 이후 강의에서 재사용할 **KIS API 유틸 세트**를 정의합니다.

역할은 세 가지입니다.

1. `.env` 또는 OS 환경변수에서 KIS 설정값을 읽어오기  
2. KIS API 호출 시 공통으로 사용할 **HTTP 헤더 생성 함수**  
3. OAuth 토큰 발급 함수 (만료 시 재발급까지 포함)

여기서는 토큰을 **사전 검증하지 않습니다.**

- 매번 별도의 테스트 API를 호출해서 토큰을 검사하면 비효율적이기 때문에,  
- 실제로 데이터를 가져오는 API를 호출한 뒤,
  - 응답의 `rt_cd`가 토큰 만료 코드(`EGW00123`, `EGW00121`)일 때만  
  - `refresh_access_token()`을 호출해 토큰을 재발급하고,  
  - 같은 요청을 한 번 더 시도합니다.

정리하면, 토큰 흐름은 이렇게 됩니다.

1. 필요하면 `ensure_kis_token()`으로 “토큰이 하나는 있다” 정도만 맞춰두고
2. 실제 KIS API 호출
3. 응답 `rt_cd` 확인
   - 정상: 그대로 사용
   - 만료 코드: `refresh_access_token()` → 헤더 다시 만들고 → 동일 요청 재시도

이 패턴을 모든 강의에서 그대로 반복하면,
- 토큰 관리 코드는 **Step 0-1에만 모아 두고**
- 각 강의에서는 **실제 데이터 수집/가공 로직**에만 집중할 수 있습니다.


In [None]:
# Step 0-1. KIS 공통 유틸

import os
from pathlib import Path
from typing import Optional

import requests
from dotenv import load_dotenv

# =============================================================================
# 1. 프로젝트 경로 및 데이터 디렉터리 설정
# =============================================================================
PROJECT_ROOT = Path.cwd()

DATA_DIR = PROJECT_ROOT / "data"
DATA_DIR.mkdir(parents=True, exist_ok=True)

# =============================================================================
# 2. .env 로부터 KIS 환경 변수 로딩
#    - 실전에서는 .env 또는 OS 환경변수로 관리
# =============================================================================
load_dotenv(PROJECT_ROOT / ".env")

KIS_APP_KEY: Optional[str] = os.getenv("KIS_APP_KEY")
KIS_APP_SECRET: Optional[str] = os.getenv("KIS_APP_SECRET")
KIS_ACCESS_TOKEN: Optional[str] = os.getenv("KIS_ACCESS_TOKEN")
KIS_URL_BASE: str = os.getenv(
    "KIS_URL_BASE",
    "https://openapivts.koreainvestment.com:29443",  # 기본값: 모의투자
)
#   - 실전: https://openapi.koreainvestment.com:9443
#   - 모의: https://openapivts.koreainvestment.com:29443


# =============================================================================
# 3. 문자열 → float 변환 유틸
# =============================================================================
def to_float(value) -> Optional[float]:
    """
    쉼표/공백이 섞인 문자열을 float로 변환하는 유틸 함수.
    - 예: "12,345 " → 12345.0
    - KIS 응답에서 금액/수치 필드 처리에 재사용.
    """
    try:
        return float(str(value).replace(",", "").strip())
    except Exception:
        return None


# =============================================================================
# 4. KIS 공통 헤더 생성 함수
#    - 이후 모든 API 호출에서 이 함수를 재사용
# =============================================================================
def get_api_headers(tr_id: str) -> dict[str, str]:
    """
    KIS 공통 헤더 생성 (접근토큰 포함).
    - tr_id: KIS 문서에 나오는 TR ID (예: FHKST03010100)
    """
    global KIS_ACCESS_TOKEN

    auth = f"Bearer {KIS_ACCESS_TOKEN}" if KIS_ACCESS_TOKEN else ""

    return {
        "content-type": "application/json",
        "authorization": auth,
        "appkey": KIS_APP_KEY or "",
        "appsecret": KIS_APP_SECRET or "",
        "tr_id": tr_id,
        "custtype": "P",  # 개인
    }


# =============================================================================
# 5. 토큰 발급 API 호출 (PATH/URL/headers/body 패턴 통일)
# =============================================================================
def refresh_access_token() -> Optional[str]:
    """
    KIS OAuth 토큰 재발급.
    - 토큰 발급도 하나의 KIS API 이므로,
      다른 엔드포인트와 동일한 방식으로 호출한다.

    1) PATH, URL 정의
    2) headers, body 정의
    3) requests.post(URL, ...) 호출
    4) access_token 추출 → 전역 변수 + .env에 반영
    """
    global KIS_ACCESS_TOKEN

    if not (KIS_APP_KEY and KIS_APP_SECRET and KIS_URL_BASE):
        raise RuntimeError("KIS_APP_KEY / KIS_APP_SECRET / KIS_URL_BASE 환경변수가 필요합니다.")

    # 1) PATH, URL
    TOKEN_PATH = "oauth2/tokenP"
    TOKEN_URL = f"{KIS_URL_BASE}/{TOKEN_PATH}"

    # 2) headers, body
    headers = {
        "content-type": "application/json",
    }
    body = {
        "grant_type": "client_credentials",
        "appkey": KIS_APP_KEY,
        "appsecret": KIS_APP_SECRET,
    }

    # 3) API 호출
    response = requests.post(TOKEN_URL, headers=headers, json=body, timeout=5)
    response.raise_for_status()

    data = response.json()
    access_token = data.get("access_token")
    if not access_token:
        raise RuntimeError(f"KIS 토큰 발급 실패: {data}")

    # 4) 전역 변수 갱신
    KIS_ACCESS_TOKEN = access_token

    # 5) .env 파일에도 반영 (편의를 위한 선택 사항)
    env_path = PROJECT_ROOT / ".env"
    try:
        lines: list[str] = []
        if env_path.exists():
            lines = env_path.read_text(encoding="utf-8").splitlines()

        new_lines = []
        token_written = False
        for line in lines:
            if line.startswith("KIS_ACCESS_TOKEN="):
                new_lines.append(f"KIS_ACCESS_TOKEN={access_token}")
                token_written = True
            else:
                new_lines.append(line)

        if not token_written:
            new_lines.append(f"KIS_ACCESS_TOKEN={access_token}")

        env_path.write_text("\n".join(new_lines), encoding="utf-8")
    except Exception:
        # 강의용: .env 업데이트 실패는 치명적 에러로 보지 않는다.
        pass

    return access_token


# =============================================================================
# 6. 선택적: 최초 한 번 토큰 보장용 (사전 헬스체크는 하지 않음)
# =============================================================================
def ensure_kis_token() -> Optional[str]:
    """
    강의 코드에서 '토큰이 하나는 있어야 한다'는 전제만 맞추고 싶을 때 사용하는 함수.

    - 메모리에 토큰이 없으면: refresh_access_token() 호출
    - 이미 있으면: 그대로 사용
    - 토큰 만료 여부는 각 API 호출 응답(rt_cd)에서 판단하고,
      그때 refresh_access_token()을 호출한다.
    """
    global KIS_ACCESS_TOKEN

    if not KIS_ACCESS_TOKEN:
        return refresh_access_token()
    return KIS_ACCESS_TOKEN


## Step 0-2. KRX MST 공식 파일 전처리 (주식 마스터만 생성)

이제 KRX에서 제공하는 **MST 종목정보 파일**을 파싱해서  
ETF 분석에 쓸 **주식 마스터 테이블**을 만들어 보겠습니다.

이번 강의에서는 ETF 분석에 필요한 **주식 코드/이름 정보만** 사용하므로,  
KRX에서 내려받은 MST 파일 중에서:

- `kospi_code.mst`
- `kosdaq_code.mst`
- `konex_code.mst`

세 가지만 사용해서 하나의 통합 테이블인  
`mst_fixed/equity_master.csv` 를 만들어 둡니다.

나중에는 이 `equity_master`만 읽어도

- 종목 코드(단축코드, 표준코드)
- 한글 종목명

을 바로 조회할 수 있기 때문에,  
분석 코드에서는 더 이상 MST 원본을 직접 다룰 필요가 없습니다.


> 먼저, 한국투자증권 오픈API 개발자센터에 접속해서  
> `API 문서` → `종목정보파일` 메뉴에서  
> `kospi_code.mst`, `kosdaq_code.mst`, `konex_code.mst` 파일을 내려받은 뒤  
> 프로젝트의 `mst_raw/` 폴더 아래에 저장해 둡니다.


In [None]:
from pathlib import Path
import pandas as pd

# MST 원본/결과 폴더
RAW_MST_DIR = PROJECT_ROOT / "mst_raw"
OUT_DIR     = PROJECT_ROOT / "mst_fixed"

RAW_MST_DIR.mkdir(parents=True, exist_ok=True)
OUT_DIR.mkdir(parents=True, exist_ok=True)


def read_mst_lines(path: Path) -> list[bytes]:
    """
    MST 파일 한 줄은 '고정 길이 레코드'이기 때문에
    텍스트가 아니라 '바이트 단위'로 읽어오는 것이 안전합니다.
    """
    lines = []
    with path.open("rb") as f:
        for raw in f:
            # 레코드 한 줄에서 개행(\r\n) 제거
            line = raw.rstrip(b"\r\n")
            if line:  # 완전히 빈 줄은 스킵
                lines.append(line)
    if not lines:
        raise ValueError(f"{path} is empty")
    return lines


# MST 파일에서 사용하는 공통 길이 정의 (바이트 단위)
SZ_SHRNCODE  = 9   # 단축코드 길이 (보통 6~9자리, KRX가 9바이트로 잡아둔 필드)
SZ_STNDCODE  = 12  # 표준코드 길이 (ISIN/표준코드 계열, 12바이트)
SZ_KORNAME   = 40  # 한글 종목명 (40바이트 고정폭)
SZ_KORNAME20 = 20  # 짧은 한글명/회원명 등 (20바이트 고정폭)

# 우리가 뽑아 쓸 필드 스키마 정의
EQUITY_SCHEMA = [
    ("short_code", SZ_SHRNCODE),  # 단축코드 (예: A005930 형태, 9바이트)
    ("std_code",   SZ_STNDCODE),  # 표준코드
    ("name",       SZ_KORNAME),   # 한글 종목명
]


def parse_fixed_width_lines(
    lines: list[bytes],
    schema,
    *,
    encoding: str = "cp949",
) -> pd.DataFrame:
    """
    MST와 같이 '한 줄이 고정폭 레코드'인 바이너리 파일을
    schema 정보를 사용해 잘라서 DataFrame으로 변환하는 함수.
    """
    # 가장 긴 레코드 길이를 기준으로 필드를 자른다.
    record_len = max(len(l) for l in lines)

    # 각 필드의 시작 위치/길이 계산
    offsets: list[tuple[str, int, int]] = []
    offset = 0
    for name, length in schema:
        offsets.append((name, offset, length))
        offset += length

    # 결과 컬럼 초기화
    columns = {name: [] for name, _ in schema}

    for line in lines:
        # 혹시 레코드가 짧으면 오른쪽을 공백으로 패딩
        if len(line) < record_len:
            line = line.ljust(record_len, b" ")

        for name, start, length in offsets:
            raw = line[start:start + length]
            # cp949로 디코딩 후 오른쪽 공백 제거
            columns[name].append(raw.decode(encoding, errors="ignore").rstrip())

    return pd.DataFrame(columns)


def build_equity_master() -> pd.DataFrame:
    """
    - kospi/kosdaq/konex MST 파일을 모두 읽어서
    - (short_code, std_code, name)만 추출한 뒤
    - 하나의 통합 테이블(equity_master.csv)을 생성.
    """
    dfs: list[pd.DataFrame] = []

    for fname in ("kospi_code.mst", "kosdaq_code.mst", "konex_code.mst"):
        path = RAW_MST_DIR / fname
        if not path.exists():
            print(f"⚠️ {path} 없음, 스킵")
            continue

        lines = read_mst_lines(path)
        df = parse_fixed_width_lines(lines, EQUITY_SCHEMA)
        dfs.append(df)

    if not dfs:
        raise RuntimeError("MST 원본을 찾지 못했습니다. mst_raw 폴더를 확인하세요.")

    merged = pd.concat(dfs, ignore_index=True)

    out_csv = OUT_DIR / "equity_master.csv"
    merged.to_csv(out_csv, index=False, encoding="utf-8-sig")
    print(f"[SAVE] equity_master.csv → {out_csv}")

    return merged


# 실제로 한 번 생성해보고 상위 5행만 확인
eq_master = build_equity_master()
print("equity_master preview:")
print(eq_master.head())


[SAVE] equity_master.csv → /Users/ulift/workspace/py-etf-mango/script2/mst_fixed/equity_master.csv
equity_master preview:
  short_code      std_code             name
0  F70100022  KR5701000220     한투미국경제주도1(A)
1  F70100023  KR5701000238   한투미국경제주도1(A-e)
2  F70100024  KR5701000246     한투미국경제주도2(A)
3  F70100025  KR5701000253   한투미국경제주도2(A-e)
4  F70100026  KR5701000261  한투글로벌넥스트웨이브1(A)


## Step 0-3. `equity_master` 로딩 & 코드/이름 조회 유틸

`mst_fixed/equity_master.csv`를 한 번 만들어두었으니  
이제는 원본 MST 파일이 아니라 **마스터 CSV**만 읽고 재사용하면 됩니다.

여기서는 다음 유틸 함수들을 정의합니다.

- `load_equity_master()`
  - `equity_master.csv` (또는 parquet)을 읽어서 DataFrame 반환
- `_extract_6digit_code(s)`
  - `A005930` 같은 문자열에서 **숫자만 추출 → 마지막 6자리**를 종목 코드로 사용
- `find_code_by_name(kor_name)`
  - 한글 종목명에 특정 문자열이 포함된 종목들을 검색
- `pick_single_code(kor_name)`
  - 검색 결과 중 첫 번째 종목을 선택하고, KIS 코드(6자리)를 반환

이 유틸들은 앞으로

- ETF 코드 찾기,
- 구성 종목 이름 붙이기

같은 작업에 계속 재사용됩니다.


In [None]:
def load_equity_master() -> pd.DataFrame:
    """
    mst_fixed/equity_master.(parquet|csv)를 읽어서 DataFrame으로 반환.
    - 추후 성능을 위해 parquet로 변환해둘 수도 있어서 두 가지를 모두 지원.
    """
    eq_path_parquet = OUT_DIR / "equity_master.parquet"
    eq_path_csv = OUT_DIR / "equity_master.csv"

    if eq_path_parquet.exists():
        df = pd.read_parquet(eq_path_parquet)
    elif eq_path_csv.exists():
        df = pd.read_csv(eq_path_csv)
    else:
        raise FileNotFoundError(
            f"equity_master 파일을 찾을 수 없습니다: {eq_path_parquet} / {eq_path_csv}"
        )

    # 앞뒤 공백 정리
    for col in ("name", "short_code", "std_code"):
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip()

    return df


def _extract_6digit_code(s: str) -> Optional[str]:
    """
    'A005930' 같은 문자열에서 숫자만 모아서
    마지막 6자리를 KIS 종목코드로 사용.
    """
    if not isinstance(s, str):
        return None

    digits = "".join(ch for ch in s if ch.isdigit())
    if len(digits) < 6:
        return None

    return digits[-6:]


def find_code_by_name(kor_name: str) -> pd.DataFrame:
    """
    한글 종목명에 특정 문자열이 포함된 종목 리스트를 반환.
    예: find_code_by_name("KODEX 200")
    """
    df = load_equity_master()

    mask = df["name"].str.contains(kor_name, case=False, na=False)
    candidates = df[mask].copy()

    if candidates.empty:
        print(f"⚠️ 이름에 '{kor_name}' 이(가) 포함된 종목을 찾지 못했습니다.")
        return candidates

    # KIS용 6자리 코드 컬럼이 없다면 생성
    if "kis_code" not in candidates.columns:
        if "short_code" in candidates.columns:
            candidates["kis_code"] = candidates["short_code"].apply(_extract_6digit_code)
        elif "std_code" in candidates.columns:
            candidates["kis_code"] = candidates["std_code"].apply(_extract_6digit_code)
        else:
            candidates["kis_code"] = None

    return candidates


def pick_single_code(kor_name: str) -> str:
    """
    검색 결과 중 첫 번째 종목을 선택하여
    - 종목명
    - KIS 코드(6자리)
    를 출력하고, 코드만 반환.
    """
    candidates = find_code_by_name(kor_name)
    if candidates.empty:
        raise ValueError(f"'{kor_name}' 에 해당하는 종목을 찾을 수 없습니다.")

    row = candidates.iloc[0]
    kis_code = row.get("kis_code")
    if not kis_code:
        raise ValueError(
            f"'{kor_name}' 후보에서 KIS 코드(6자리)를 추출하지 못했습니다.\n{row}"
        )

    print(f"[INFO] '{kor_name}' → 선택된 종목: {row.get('name')} (kis_code={kis_code})")
    return str(kis_code)


## Step 0-4. ETF 코드 → 한글 이름 조회 (MST + KIS API)

이제 마지막으로, ETF 6자리 코드를 넣으면:

1. 먼저 **MST 마스터(`equity_master`)에서 이름을 찾아보고**
2. 거기에 없으면 **KIS OpenAPI**를 한 번 조회해서 이름을 보완하는

`get_etf_name(etf_code)` 함수를 만들어 보겠습니다.

이 함수는 뒤 강의(구성 종목 시세 조회, ETF 스냅샷, 그래프/표 출력 등)에서  
**그래프/표에 ETF 이름을 깔끔하게 표시하는 용도**로 계속 재사용하게 됩니다.


In [None]:
def get_etf_name_from_mst(etf_code: str) -> Optional[str]:
    """
    equity_master에서 KIS 6자리 코드로 ETF 이름을 찾는다.
    (없으면 None)
    """
    df = load_equity_master()

    # 아직 kis_code 컬럼이 없다면 생성
    if "kis_code" not in df.columns:
        if "short_code" in df.columns:
            df["kis_code"] = df["short_code"].apply(_extract_6digit_code)
        elif "std_code" in df.columns:
            df["kis_code"] = df["std_code"].apply(_extract_6digit_code)

    if "kis_code" not in df.columns:
        return None

    mask = df["kis_code"] == str(etf_code)
    hit = df[mask]
    if hit.empty:
        return None

    name = str(hit.iloc[0]["name"]).strip()
    return name or None


def get_etf_name_from_kis(etf_code: str) -> Optional[str]:
    """
    KIS '종목검색' API를 사용해 ETF 이름을 조회.
    (MST에서 못 찾았을 때 보완용)
    """
    if not KIS_URL_BASE:
        return None

    PATH = "uapi/domestic-stock/v1/quotations/search-stock-info"
    URL  = f"{KIS_URL_BASE}/{PATH}"

    params = {
        "PRDT_TYPE_CD": "512",   # 512: ETF/ETN 상품코드 (문서 기준)
        "PDNO": etf_code,        # 종목 코드(6자리)
    }
    headers = get_api_headers("CTPF1604R")

    try:
        res = requests.get(url, headers=headers, params=params, timeout=5)
        data = res.json()
    except Exception as e:
        print(f"⚠️ KIS ETF 이름 조회 실패: {e}")
        return None

    if rt_cd in ["EGW00123", "EGW00121"]:
        refresh_access_token()
        headers = get_api_headers("CTPF1604R")
        try:
            res = requests.get(URL, headers=headers, params=params, timeout=5)
            data = res.json()
        except Exception:
            return None
        rt_cd = data.get("rt_cd")

    # 최종 rt_cd 확인
    if rt_cd != "0" or not data.get("output"):
        return None

    output = data["output"]
    row = output[0] if isinstance(output, list) else output

    # 응답 필드 이름은 KIS 문서 기준
    name = row.get("prdt_name") or row.get("hts_kor_isnm")
    name = (name or "").strip()

    return name or None


def get_etf_name(etf_code: str) -> str:
    """
    ETF 코드(6자리)를 넣으면
    - MST → 없으면 KIS 순으로 이름을 찾고
    - 둘 다 실패하면 코드 자체를 반환.
    """
    code = str(etf_code)

    # 1) MST에서 먼저 찾기
    name = get_etf_name_from_mst(code)
    if name:
        print(f"ETF 이름(MST): {code} → {name}")
        return name

    # 2) KIS API에서 보완
    name = get_etf_name_from_kis(code)
    if name:
        print(f"ETF 이름(KIS): {code} → {name}")
        return name

    print(f"ETF 이름 조회 실패 → 코드 사용 ({code})")
    return code


In [None]:
# Step 0 전체 동작 테스트

# 1) equity_master 로딩
eq = load_equity_master()
print("equity_master shape:", eq.shape)
print(eq.head())

# 2) 코드 → ETF 이름 (MST 우선, 필요 시 KIS)
ensure_kis_token()

kodex200_code = "069500"  # KODEX 200 예시
etf_name = get_etf_name(kodex200_code)
print("\n최종 ETF 이름:", etf_name)


equity_master shape: (4412, 3)
  short_code      std_code             name
0  F70100022  KR5701000220     한투미국경제주도1(A)
1  F70100023  KR5701000238   한투미국경제주도1(A-e)
2  F70100024  KR5701000246     한투미국경제주도2(A)
3  F70100025  KR5701000253   한투미국경제주도2(A-e)
4  F70100026  KR5701000261  한투글로벌넥스트웨이브1(A)
ETF 이름(MST): 069500 → KODEX 200

최종 ETF 이름: KODEX 200
