In [1]:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup
import pandas as pd
import re
import requests
import cx_Oracle
import FinanceDataReader as fdr

In [2]:
# 🟢 Selenium 설정
chrome_option = Options()
chrome_option.add_experimental_option('detach', True)
driver = webdriver.Chrome(options=chrome_option)

# 🟢 네이버 금융에서 업종 데이터 크롤링
url = 'https://finance.naver.com/sise/sise_group.naver?type=upjong'
driver.get(url)
driver.implicitly_wait(2)
soup = BeautifulSoup(driver.page_source, 'html.parser')

rows = driver.find_elements(By.CSS_SELECTOR, '.type_1 > tbody:nth-child(3) > tr')

sector_data = []

for row in rows:
    cols = row.find_elements(By.TAG_NAME, 'td')
    if len(cols) >= 6:
        sector_link = cols[0].find_element(By.TAG_NAME, 'a').get_attribute('href')
        match = re.search(r'no=(\d+)', sector_link)

        sector_code = match.group(1) if match else None
        sector_info = {
            '업종코드': sector_code,
            '업종명': cols[0].text.strip(),
        }
        sector_data.append(sector_info)

# 🟡 DataFrame 변환
sector_df = pd.DataFrame(sector_data)
driver.quit()

# 🟢 업종별 종목 리스트 크롤링 함수
def sector_detail_url(sector_name):
    sector_code = sector_df[sector_df['업종명'] == sector_name]['업종코드'].values
    if len(sector_code) == 0:
        print(f"존재하지 않는 업종: {sector_name}")
        return None
    return f'https://finance.naver.com/sise/sise_group_detail.naver?type=upjong&no={sector_code[0]}'

# 🟢 업종별 종목 리스트 크롤링
sector_list = sector_df['업종명'].tolist()
sector_by_stock_list = []

for sector in sector_list:
    url = sector_detail_url(sector)
    if not url:
        continue

    res = requests.get(url)
    if res.status_code != 200:
        print(f'데이터를 가져오는 데 실패했습니다: {sector}')
        continue

    soup = BeautifulSoup(res.content, 'lxml')
    rows = soup.select('#contentarea > div:nth-child(5) > table > tbody > tr')

    for tr in rows:
        cols = tr.find_all("td")

        if len(cols) >= 9:
            stock_name = cols[0].text.strip()
            is_kosdaq = '*' in stock_name
            stock_info = {
                '업종명': sector,
                '종목명': stock_name.replace('*', '').strip(),
                '코스닥 여부': is_kosdaq
            }
            sector_by_stock_list.append(stock_info)

# 🟡 최신 종목 업종 정보 DataFrame 저장
info_df = pd.DataFrame(sector_by_stock_list)
info_df.to_csv('sector_code_stock.csv', index=False, encoding='utf-8-sig')
print("✅ 최신 업종 데이터 저장 완료: sector_code_stock.csv")

# 🟢 시장 데이터 + Sector 추가 함수
def add_sector_to_market_data(market):
    """
    시장 데이터에 Sector(업종) 정보를 추가하여 반환하는 함수
    """
    if market == 'KOSPI':
        df = fdr.StockListing('KOSPI')
    elif market == 'KOSDAQ':
        df = fdr.StockListing('KOSDAQ')
    elif market == 'ETF':
        df = fdr.StockListing('ETF/KR')
    else:
        raise ValueError("지원하지 않는 시장입니다.")

    # CSV 파일에서 최신 업종 데이터 로드
    info_df = pd.read_csv('sector_code_stock.csv', encoding='utf-8-sig')

    # KOSPI, KOSDAQ은 업종 정보 병합
    if market in ['KOSPI', 'KOSDAQ']:
        df = df.merge(info_df[['종목명', '업종명']], left_on='Name', right_on='종목명', how='left')
        df.rename(columns={'업종명': 'Sector'}, inplace=True)

    # ETF의 경우, Sector 값 부여
    if market == 'ETF':
        df.rename(columns={'Symbol': 'Code', 'MarCap': 'Marcap'}, inplace=True)
        category_decode = {
            1: '국내 시장지수', 2: '국내 업종/테마', 3: '국내파생',
            4: '해외주식', 5: '원자재', 6: '채권', 7: '기타ETF'
        }
        df['Sector'] = df['Category'].map(category_decode)

    return df[['Code', 'Name', 'Sector']]

✅ 최신 업종 데이터 저장 완료: sector_code_stock.csv


In [3]:
# 🟢 STOCKS 테이블 업데이트 + SECTORS 테이블 자동 추가 함수
def update_stocks_table(market, market_id, sector_df):
    df = add_sector_to_market_data(market)

    # KOSDAQ의 경우 '*' 추가
    if market == 'KOSDAQ':
        df['Sector'] += '*'

    dsn = cx_Oracle.makedsn('localhost', 1521, service_name='xe')
    with cx_Oracle.connect(user='c##PROJECT', password='k5002', dsn=dsn) as connection:
        with connection.cursor() as cursor:
            
            # 🟡 SECTORS 테이블에서 SEC_ID 매핑 및 없는 Sector 찾기
            sector_id_mapping = {}
            missing_sectors = []  # SECTORS 테이블에 없는 Sector 저장 리스트

            for sector in df['Sector'].unique():
                cursor.execute("SELECT SEC_ID FROM SECTORS WHERE SEC_NM = :sector", {'sector': sector})
                result = cursor.fetchone()
                if result:
                    sector_id_mapping[sector] = result[0]
                else:
                    missing_sectors.append(sector)  # 존재하지 않는 Sector 저장

            # SECTORS 테이블에 없는 값 출력
            if missing_sectors:
                print(f"⚠️ SECTORS 테이블에 없는 Sector 값: {missing_sectors}")

            # 🟡 SECTORS 테이블에 없는 값 자동 추가
            for sector in missing_sectors:
                sector_code = sector_df[sector_df['업종명'] == sector]['업종코드'].values
                sector_code = sector_code[0] if len(sector_code) > 0 else 'UNKNOWN'

                cursor.execute("""
                    INSERT INTO SECTORS (SEC_ID, SEC_NM, SEC_CODE) 
                    VALUES (SECTORS_SEQ.NEXTVAL, :sector, :sec_code)
                """, {'sector': sector, 'sec_code': sector_code})

                # 새로 추가된 SEC_ID 가져오기
                cursor.execute("SELECT SEC_ID FROM SECTORS WHERE SEC_NM = :sector", {'sector': sector})
                sector_id_mapping[sector] = cursor.fetchone()[0]

            # SEC_ID 매핑 적용
            df['SEC_ID'] = df['Sector'].map(sector_id_mapping)

            # 🟡 SEC_ID가 None인 데이터는 제거 (DPI-1043 방지)
            df = df.dropna(subset=['SEC_ID'])

            # 🟡 STOCKS 테이블에서 중복 확인 후 업데이트 수행
            for index, row in df.iterrows():
                cursor.execute("SELECT STK_ID, SEC_ID FROM STOCKS WHERE STK_CODE = :stk_code", {'stk_code': row['Code']})
                result = cursor.fetchone()

                if result:  # 이미 존재하는 경우 -> 필요 시 SEC_ID 업데이트
                    existing_stk_id, existing_sec_id = result
                    if existing_sec_id != row['SEC_ID']:  # 업종이 변경된 경우
                        cursor.execute("""
                            UPDATE STOCKS 
                            SET SEC_ID = :sec_id 
                            WHERE STK_ID = :stk_id
                        """, {'sec_id': row['SEC_ID'], 'stk_id': existing_stk_id})
                else:  # 새로운 종목이면 INSERT
                    try:
                        cursor.execute("""
                            INSERT INTO STOCKS (STK_ID, STK_CODE, STK_NM, SEC_ID, MARKET_ID, CDATE)
                            VALUES (STOCKS_SEQ.NEXTVAL, :stk_code, :stk_nm, :sec_id, :market_id, SYSDATE)
                        """, {
                            'stk_code': row['Code'],
                            'stk_nm': row['Name'],
                            'sec_id': row['SEC_ID'],
                            'market_id': market_id
                        })
                    except cx_Oracle.DatabaseError as e:
                        error, = e.args
                        print(f"❌ Error occurred while inserting row {index}: {error.message}")

            connection.commit()
            print(f"✅ {market} STOCKS 테이블 업데이트 완료")

In [4]:
# 🟡 SECTOR 데이터 (업종 코드 포함) 불러오기
sector_df = pd.read_csv('sector_code_stock.csv', encoding='utf-8-sig')

In [5]:
# 🟢 KOSPI, KOSDAQ, ETF 업데이트 실행
update_stocks_table('KOSPI', market_id=1, sector_df=sector_df)
update_stocks_table('KOSDAQ', market_id=2, sector_df=sector_df)
update_stocks_table('ETF', market_id=3, sector_df=sector_df)

✅ KOSPI STOCKS 테이블 업데이트 완료
✅ KOSDAQ STOCKS 테이블 업데이트 완료
✅ ETF STOCKS 테이블 업데이트 완료
