In [1]:
%pip install pandas_datareader pykrx yfinance

Collecting pykrx
  Downloading pykrx-1.0.48-py3-none-any.whl.metadata (60 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.9/60.9 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
Collecting datetime (from pykrx)
  Downloading DateTime-5.5-py3-none-any.whl.metadata (33 kB)
Collecting zope.interface (from datetime->pykrx)
  Downloading zope.interface-7.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.4/44.4 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
Downloading pykrx-1.0.48-py3-none-any.whl (2.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m29.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading DateTime-5.5-py3-none-any.whl (52 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.6/52.6 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading zope.interface-7.2-cp311-cp311-

# src

## common

### path.py

In [2]:
class PATH:
    import os

    try:
        ROOT = os.path.dirname(__file__)
        while not ROOT.endswith('yuho'):
            ROOT = os.path.dirname(ROOT)
    except NameError:
        ROOT = 'https://raw.githubusercontent.com/labwons/yuho/main/'

    try:
        DESKTOP = os.path.join(os.path.join(os.environ['USERPROFILE']), 'Desktop')
        DOWNLOADS = os.path.join(os.path.join(os.environ['USERPROFILE']), 'Downloads')
    except KeyError:
        DESKTOP = DOWNLOADS = ROOT

    GROUP  = os.path.join(ROOT, r'src/fetch/market/json/group.json')
    INDEX  = os.path.join(ROOT, r'src/fetch/market/json/index.json')
    SPEC  = os.path.join(ROOT, r'src/fetch/market/json/spec.json')
    STATE  = os.path.join(ROOT, r'src/fetch/market/json/state.json')

    BUBBLE = os.path.join(ROOT, r'docs/src/json/bubble.json')
    MAP    = os.path.join(ROOT, r'docs/src/json/treemap.json')
    MACRO  = os.path.join(ROOT, r'docs/src/json/macro.json')



if __name__ == "__main__":
    print(PATH.ROOT)
    print(PATH.GROUP)
    print(PATH.STATE)
    print(PATH.SPEC)
    print(PATH.INDEX)
    print(PATH.MAP)

https://raw.githubusercontent.com/labwons/yuho/main/
https://raw.githubusercontent.com/labwons/yuho/main/src/fetch/market/json/group.json
https://raw.githubusercontent.com/labwons/yuho/main/src/fetch/market/json/state.json
https://raw.githubusercontent.com/labwons/yuho/main/src/fetch/market/json/spec.json
https://raw.githubusercontent.com/labwons/yuho/main/src/fetch/market/json/index.json
https://raw.githubusercontent.com/labwons/yuho/main/docs/src/json/treemap.json


## fetch

### market

#### group.py

In [None]:
from pandas import (
    DataFrame,
    concat,
    Index,
    read_json,
    Series
)
from pykrx.stock import get_index_portfolio_deposit_file
from re import compile
from requests import get
from requests.exceptions import JSONDecodeError
from time import sleep, time
from typing import (
    Dict,
    Iterable,
    List
)
if "PATH" not in globals():
    try:
        from ...common.path import PATH
    except ImportError:
        from src.common.path import PATH


SECTOR_CODE:Dict[str, str] = {
    'WI100': '에너지', 'WI110': '화학',
    'WI200': '비철금속', 'WI210': '철강', 'WI220': '건설', 'WI230': '기계', 'WI240': '조선', 'WI250': '상사,자본재', 'WI260': '운송',
    'WI300': '자동차', 'WI310': '화장품,의류', 'WI320': '호텔,레저', 'WI330': '미디어,교육', 'WI340': '소매(유통)',
    'WI400': '필수소비재', 'WI410': '건강관리',
    'WI500': '은행', 'WI510': '증권', 'WI520': '보험',
    'WI600': '소프트웨어', 'WI610': 'IT하드웨어', 'WI620': '반도체', 'WI630': 'IT가전', 'WI640': '디스플레이',
    'WI700': '통신서비스',
    'WI800': '유틸리티'
}
CODE_LABEL:Dict[str, str] = {
    'CMP_CD': 'ticker', 'CMP_KOR': 'name',
    'SEC_CD': 'sectorCode', 'SEC_NM_KOR': 'sectorName',
    'IDX_CD': 'industryCode', 'IDX_NM_KOR': 'industryName',
}
REITS_CODE:Dict[str, str] = {
    "088980": "맥쿼리인프라",
    "395400": "SK리츠",
    "365550": "ESR켄달스퀘어리츠",
    "330590": "롯데리츠",
    "348950": "제이알글로벌리츠",
    "293940": "신한알파리츠",
    "432320": "KB스타리츠",
    "094800": "맵스리얼티1",
    "357120": "코람코라이프인프라리츠",
    "448730": "삼성FN리츠",
    "451800": "한화리츠",
    "088260": "이리츠코크렙",
    "334890": "이지스밸류리츠",
    "377190": "디앤디플랫폼리츠",
    "404990": "신한서부티엔디리츠",
    "417310": "코람코더원리츠",
    "400760": "NH올원리츠",
    "350520": "이지스레지던스리츠",
}


class MarketGroup(DataFrame):

    _log:List[str] = []

    def __init__(self, update:bool=True):
        stime = time()
        if not update:
            super().__init__(read_json(PATH.GROUP, orient='index'))
            self.index = self.index.astype(str).str.zfill(6)
            self.index.name = 'ticker'
            return

        date = self.fetchTradingDate()
        self.log = f'Begin [Market Group Fetch] @{date}'
        objs, size = [], len(SECTOR_CODE) + 1
        for n, (code, name) in enumerate(SECTOR_CODE.items()):
            obj = self.fetchWiseGroup(code, date)
            objs.append(obj)
            proc = f"... {str(n + 1).zfill(2)} / {size} : {code} {name} :: "
            self.log = f"{proc}Fail" if obj.empty else f"{proc}Success"

        reits = DataFrame(data={'CMP_KOR': REITS_CODE.values(), 'CMP_CD':REITS_CODE.keys()})
        reits[['SEC_CD', 'IDX_CD', 'SEC_NM_KOR', 'IDX_NM_KOR']] \
              = ['G99', 'WI999', '리츠', '리츠']
        objs.append(reits)

        self.log = f"... {size} / {size} : WI999 리츠 :: Success"

        super().__init__(concat(objs, axis=0, ignore_index=True))
        self.drop(columns=[key for key in self if not key in CODE_LABEL], inplace=True)
        self.drop(index=self[self['SEC_CD'].isna()].index, inplace=True)
        self.rename(columns=CODE_LABEL, inplace=True)
        self.set_index(keys="ticker", inplace=True)
        self['industryName'] = self['industryName'].str.replace("WI26 ", "")

        kq, lg = self.fetchKosdaqList(self.index), self.fetchLargeCapList(self.index)
        self.loc[kq, 'name'] = self.loc[kq, 'name'] + '*'
        self.loc[lg, 'stockSize'] = 'large'
        self.log = "... Identify Tickers: Success"

        sc_mdi = self[(self['industryCode'] == 'WI330') & (self['sectorCode'] == 'G50')].index
        sc_edu = self[(self['industryCode'] == 'WI330') & (self['sectorCode'] == 'G25')].index
        sc_sw = self[(self['industryCode'] == 'WI600') & (self['sectorCode'] == 'G50')].index
        sc_it = self[(self['industryCode'] == 'WI600') & (self['sectorCode'] == 'G45')].index
        self.loc[sc_mdi, 'industryCode'], self.loc[sc_mdi, 'industryName'] = 'WI331', '미디어'
        self.loc[sc_edu, 'industryCode'], self.loc[sc_edu, 'industryName'] = 'WI332', '교육'
        self.loc[sc_sw, 'industryCode'], self.loc[sc_sw, 'industryName'] = 'WI601', '소프트웨어'
        self.loc[sc_it, 'industryCode'], self.loc[sc_it, 'industryName'] = 'WI602', 'IT서비스'

        self.log = f'End [Market Group Fetch] / Elapsed: {time() - stime:.2f}s'
        return

    @property
    def log(self) -> str:
        return "\n".join(self._log)

    @log.setter
    def log(self, log:str):
        self._log.append(log)

    @classmethod
    def fetchTradingDate(cls) -> str:
        URL = 'https://www.wiseindex.com/Index/Index#/G1010.0.Components'
        pattern = compile(r"var\s+dt\s*=\s*'(\d{8})'")
        return pattern.search(get(URL).text).group(1)

    @classmethod
    def fetchWiseGroup(cls, code:str, date:str="", countdown:int=5) -> DataFrame:
        resp = get(f'http://www.wiseindex.com/Index/GetIndexComponets?ceil_yn=0&dt={date}&sec_cd={code}')
        try:
            return DataFrame(resp.json()['list'])
        except JSONDecodeError:
            if countdown == 0:
                return DataFrame()
            sleep(5)
            return cls.fetchWiseGroup(code, date, countdown - 1)

    @classmethod
    def fetchKosdaqList(cls, _tickers:Index=None) -> List[str]:
        tickers = get_index_portfolio_deposit_file('2001')
        if not _tickers.empty:
            tickers = [ticker for ticker in _tickers if ticker in tickers]
        return tickers

    @classmethod
    def fetchLargeCapList(cls, _tickers:Index=None) -> List[str]:
        tickers = get_index_portfolio_deposit_file('2203') \
                + get_index_portfolio_deposit_file('1028')
        if not _tickers.empty:
            tickers = [ticker for ticker in _tickers if ticker in tickers]
        return tickers

if __name__ == "__main__":
    marketGroup = MarketGroup(True)
    # print(marketGroup)
    print(marketGroup.log)


#### index.py

In [None]:
from pandas import (
    DataFrame,
    read_json,
    to_datetime
)
from pykrx.stock import (
    get_index_ohlcv_by_date,
    get_nearest_business_day_in_a_week
)
from re import compile, search
from requests import get
from requests.exceptions import JSONDecodeError
from time import sleep, time
from typing import (
    Dict,
    List
)
from warnings import simplefilter
if "PATH" not in globals():
    try:
        from ...common.path import PATH
    except ImportError:
        from src.common.path import PATH

simplefilter(action='ignore', category=FutureWarning)


INDEX_CODE:Dict[str, str] = {
    '1001': 'KOSPI', '2001': 'KOSDAQ',
    'WI100': '에너지', 'WI110': '화학',
    'WI200': '비철금속', 'WI210': '철강', 'WI220': '건설', 'WI230': '기계', 'WI240': '조선', 'WI250': '상사,자본재', 'WI260': '운송',
    'WI300': '자동차', 'WI310': '화장품,의류', 'WI320': '호텔,레저', 'WI330': '미디어,교육', 'WI340': '소매(유통)',
    'WI400': '필수소비재', 'WI410': '건강관리',
    'WI500': '은행', 'WI510': '증권', 'WI520': '보험',
    'WI600': '소프트웨어', 'WI610': 'IT하드웨어', 'WI620': '반도체', 'WI630': 'IT가전', 'WI640': '디스플레이',
    'WI700': '통신서비스',
    'WI800': '유틸리티'
}


class MarketIndex(DataFrame):

    _log:List[str] = []

    def __init__(self, update:bool=True):
        stime = time()
        super().__init__(read_json(PATH.INDEX, orient='index'))
        self.index = self.index.date
        if not update:
            return

        trading_date = get_nearest_business_day_in_a_week()
        server_date = self.fetchServerDate()
        self.log = f'Begin [Market Index Fetch] @{trading_date}'

        for n, (code, name) in enumerate(INDEX_CODE.items()):
            proc = f'... ({n + 1} / {len(INDEX_CODE)}) : {code} {name} :: '
            start = self[code].dropna().index[-1]
            end = trading_date if code in ['1001', '2001'] else server_date
            if start == end:
                continue
            fetch = self.fetchWiseSeries(code, f'{start}', f'{end}')
            if fetch.empty:
                self.log = f'{proc}Fail'
                continue
            for dt in fetch.index:
                self.at[dt, code] = fetch.loc[dt, code]
            self.log = f'{proc}Success '
        self.log = f'End [Market Index Fetch] / Elapsed: {time() - stime:.2f}s'
        return

    @property
    def log(self) -> str:
        return "\n".join(self._log)

    @log.setter
    def log(self, log:str):
        self._log.append(log)

    @classmethod
    def _netDate2normDate(cls, timestamp:str):
        timestamp = int(search(r'\((\d+)\)', timestamp).group(1))
        return to_datetime(timestamp, unit='ms', utc=True) \
               .tz_convert('Asia/Seoul') \
               .date()

    @classmethod
    def fetchServerDate(cls) -> str:
        URL = 'https://www.wiseindex.com/Index/Index#/G1010.0.Components'
        pattern = compile(r"var\s+dt\s*=\s*'(\d{8})'")
        return pattern.search(get(URL).text).group(1)

    @classmethod
    def fetchWiseSeries(cls, code:str, start:str, end:str, countdown:int=5) -> DataFrame:
        if code in ['1001', '2001']:
            fetch = get_index_ohlcv_by_date(start, end, code, 'd', False)
            fetch.index = fetch.index.date
            fetch = fetch.rename(columns={"종가": code})
            return fetch

        resp = get(f'http://www.wiseindex.com/DataCenter/GridData?currentPage=1&endDT={end}&fromDT={start}&index_ids={code}&isEnd=1&itemType=1&perPage=10000&term=1')
        try:
            fetch = DataFrame(resp.json())[["TRD_DT", "IDX1_VAL1"]]
            fetch["TRD_DT"] = fetch["TRD_DT"].apply(cls._netDate2normDate)
            return fetch.rename(columns={"IDX1_VAL1": code}).set_index(keys="TRD_DT")
        except JSONDecodeError:
            if countdown == 0:
                return DataFrame()
            sleep(5)
            return cls.fetchWiseSeries(code, start, end, countdown - 1)


if __name__ == "__main__":
    marketIndex = MarketIndex(True)
    # print(marketIndex)
    print(marketIndex.log)


#### state.py

In [None]:
from datetime import datetime, timedelta
from io import StringIO
from pandas import (
    concat,
    DataFrame,
    Index,
    read_html,
    read_json,
)
from pykrx.stock import (
    get_exhaustion_rates_of_foreign_investment,
    get_nearest_business_day_in_a_week,
    get_market_cap_by_ticker,
    get_market_fundamental,
    get_market_ohlcv_by_date
)
from requests import get
from requests.exceptions import JSONDecodeError, SSLError
from time import time
from typing import Dict, Iterable, List
if "PATH" not in globals():
    try:
        from ...common.path import PATH
    except ImportError:
        from src.common.path import PATH


IPO_LABEL:Dict[str, str] = {
    '회사명':'name', '종목코드':'ticker',
    '상장일':'ipo', '주요제품':'products', '결산월':'settlementMonth'
}
CAP_LABEL:Dict[str, str] = {
    '종가':'close', '시가총액':'marketCap',
    '거래량':'volume', '거래대금':'amount', '상장주식수':'shares'
}
MUL_LABEL:Dict[str, str] = {
    'PER': 'PER', 'PBR': 'PBR', 'DIV': 'dividendYield'
}
PCT_LABEL:Dict[str, str] = {"지분율":'foreignRate'}
PRC_LABEL:Dict[str, str] = {
    "시가":"open", "고가":"high", "저가":"low", "종가":"close",
    "거래량":"volume", "거래대금":"amount"
}
INTERVALS:Dict[str, int] = {
    'D+0': 0, 'D-1': 1, 'W-1': 7,
    'M-1': 30, 'M-3': 91, 'M-6': 182, 'Y-1': 365
}


class MarketState(DataFrame):

    _log:List[str] = []

    def __init__(self, update:bool=True):
        stime = time()
        if not update:
            super().__init__(read_json(PATH.STATE, orient='index'))
            self.index = self.index.astype(str).str.zfill(6)
            return

        date = get_nearest_business_day_in_a_week()
        self.log = f'Begin [Market State Fetch] @{date}'

        fdef = [self.fetchMarketCap, self.fetchMultiples, self.fetchForeignRate]
        ks = concat([func(date, 'KOSPI') for func in fdef], axis=1)
        ks['market'] = 'kospi'
        self.log = f'... Fetch KOSPI Market State :: {"Fail" if ks.empty else "Success"}'
        kq = concat([func(date, 'KOSDAQ') for func in fdef], axis=1)
        kq['market'] = 'kosdaq'
        self.log = f'... Fetch KOSDAQ Market State :: {"Fail" if kq.empty else "Success"}'
        market = concat([ks, kq], axis=0)

        market = market[
            (~market.index.isin(self.fetchKonexList(date))) &
            (market.index.isin(self.fetchIpoList().index)) &
            (~market['shares'].isna())
        ]
        market = market[market['marketCap'] >= market['marketCap'].median()]

        returns = self.fetchReturns(date, market.index)
        self.log = f'... Fetch Returns :: {"Fail" if returns.empty else "Success"}'

        merge = returns.join(market, how='left')
        merge = merge.sort_values(by='marketCap', ascending=False)
        super().__init__(merge)

        self.log = f'End [Market State Fetch] / Elapsed: {time() - stime:.2f}s'
        return

    @property
    def log(self) -> str:
        return "\n".join(self._log)

    @log.setter
    def log(self, log:str):
        self._log.append(log)

    @classmethod
    def fetchKonexList(cls, date:str) -> Index:
        try:
            return get_market_cap_by_ticker(date=date, market='KONEX').index
        except (KeyError, RecursionError, JSONDecodeError, SSLError):
            return Index([])

    @classmethod
    def fetchIpoList(cls) -> DataFrame:
        _url = 'http://kind.krx.co.kr/corpgeneral/corpList.do?method=download'
        try:
            resp = StringIO(get(_url).text)
            df = read_html(io=resp, encoding='euc-kr')[0][IPO_LABEL.keys()] \
                 .rename(columns=IPO_LABEL) \
                 .set_index(keys='ticker')
            df.index = df.index.astype(str).str.zfill(6)
            return df
        except (KeyError, RecursionError, JSONDecodeError, SSLError):
            return DataFrame(columns=list(IPO_LABEL.values()))

    @classmethod
    def fetchMarketCap(cls, date:str, market:str='ALL') -> DataFrame:
        try:
            df = get_market_cap_by_ticker(date=date, market=market, alternative=True) \
                 .rename(columns=CAP_LABEL)
            df.index.name = 'ticker'
            return df
        except (KeyError, RecursionError, JSONDecodeError, SSLError):
            return DataFrame(columns=list(CAP_LABEL.values()))

    @classmethod
    def fetchMultiples(cls, date:str, market:str='ALL') -> DataFrame:
        try:
            df = get_market_fundamental(date=date, market=market) \
                 .rename(columns=MUL_LABEL)
            df.index.name = "ticker"
            return df[MUL_LABEL.values()]
        except (KeyError, RecursionError, JSONDecodeError, SSLError):
            return DataFrame(columns=list(MUL_LABEL.values()))

    @classmethod
    def fetchForeignRate(cls, date:str, market:str='ALL') -> DataFrame:
        try:
            df = get_exhaustion_rates_of_foreign_investment(date=date, market=market) \
                 .rename(columns=PCT_LABEL)
            df.index.name = 'ticker'
            return round(df[PCT_LABEL.values()].astype(float), 2)
        except (KeyError, RecursionError, JSONDecodeError, SSLError):
            return DataFrame(columns=list(PCT_LABEL.values()))

    @classmethod
    def fetchReturns(cls, date:str, tickers:Iterable=None) -> DataFrame:
        tdate = datetime.strptime(date, "%Y%m%d")
        intv = {key: tdate - timedelta(val) for key, val in INTERVALS.items()}
        objs = {
            key: cls.fetchMarketCap(val.strftime("%Y%m%d"))
            for key, val in intv.items()
        }
        base = concat(objs, axis=1)
        base = base[base.index.isin(tickers)]

        returns = concat({
            dt: base[dt]['close'] / base['D+0']['close'] - 1 for dt in objs
        }, axis=1)
        returns.drop(columns=['D+0'], inplace=True)

        diff = base[base['Y-1']['shares'] != base['D+0']['shares']].index
        fdate = (tdate - timedelta(380)).strftime("%Y%m%d")
        for ticker in diff:
            ohlc = get_market_ohlcv_by_date(fromdate=fdate, todate=date, ticker=ticker)
            for interval in returns.columns:
                ohlc_copy = ohlc[ohlc.index >= intv[interval]]['종가']
                returns.loc[ticker, interval] = ohlc_copy.iloc[-1] / ohlc_copy.iloc[0] - 1
        return round(100 * returns, 2)


if __name__ == "__main__":
    marketState = MarketState(True)
    # print(marketState)
    print(marketState.log)



  df = df.replace('', 0)
  df = df.replace('', 0)


Begin [Market State Fetch] @20250211
... Fetch KOSPI Market State :: Success
... Fetch KOSDAQ Market State :: Success
... Fetch Returns :: Success
End [Market State Fetch] / Elapsed: 231.89s


In [None]:
marketState.to_json(orient='index').replace('nan', '')

'{"005930":{"D-1":-0.18,"W-1":-5.39,"M-1":-0.72,"M-3":-4.85,"M-6":36.62,"Y-1":33.03,"close":55700,"marketCap":332516888035000,"volume":24596196,"amount":1370657724557,"shares":5969782550,"PER":26.14,"PBR":1.07,"dividendYield":2.59,"foreignRate":49.91,"market":"kospi"},"000660":{"D-1":-0.8,"W-1":-4.31,"M-1":1.9,"M-3":-6.96,"M-6":-8.91,"Y-1":-28.49,"close":199700,"marketCap":145382072290500,"volume":2488710,"amount":496390495100,"shares":728002365,"PER":0.0,"PBR":2.57,"dividendYield":0.6,"foreignRate":55.81,"market":"kospi"},"207940":{"D-1":-3.75,"W-1":-8.69,"M-1":-14.82,"M-3":-15.93,"M-6":-20.02,"Y-1":-30.58,"close":1174000,"marketCap":83558276000000,"volume":91316,"amount":106986690000,"shares":71174000,"PER":97.42,"PBR":8.5,"dividendYield":0.0,"foreignRate":13.56,"market":"kospi"},"373220":{"D-1":1.02,"W-1":-2.92,"M-1":1.6,"M-3":24.49,"M-6":-3.94,"Y-1":14.87,"close":343000,"marketCap":80262000000000,"volume":178558,"amount":61654136000,"shares":234000000,"PER":64.88,"PBR":3.97,"divide

#### spec.py

In [3]:
from datetime import datetime
from io import StringIO
from numpy import nan
from pandas import (
    concat,
    DataFrame,
    read_html,
    read_json,
    Series
)
from pykrx.stock import get_market_cap_by_ticker
from re import DOTALL, sub
from requests import get
from requests.exceptions import JSONDecodeError, SSLError
from time import time
from typing import Dict, List, Union, Tuple
from xml.etree.ElementTree import Element, fromstring
if "PATH" not in globals():
    try:
        from ...common.path import PATH
    except ImportError:
        from src.common.path import PATH


CAP_LABEL: Dict[str, str] = {
    '종가': 'close', '시가총액': 'marketCap',
    '거래량': 'volume', '거래대금': 'amount', '상장주식수': 'shares'
}
IPO_LABEL = {
    '회사명': 'name', '종목코드': 'ticker',
    '상장일': 'ipo', '주요제품': 'products', '결산월': 'settlementMonth'
}
OVERVIEW_TAG: Dict[str, str] = {
    'high52': 'price/high52week',
    'low52': 'price/low52week',
    'beta': 'price/beta',
    'floatShares': 'price/ff_sher_rt',
    'estPrice': 'consensus/target_price',
    'estEps': 'consensus/eps'
}
STATEMENT_TAG: Dict[str, str] = {
    'consolidateAnnual': f'financial_highlight_ifrs_D/financial_highlight_annual',
    'consolidateQuarter': f'financial_highlight_ifrs_D/financial_highlight_quarter',
    'separateAnnual': f'financial_highlight_ifrs_B/financial_highlight_annual',
    'separateQuarter': f'financial_highlight_ifrs_B/financial_highlight_quarter'
}


class MarketSpec(DataFrame):
    _log: List[str] = []

    def __init__(self, update: bool = True):
        stime = time()
        if not update:
            super().__init__(read_json(PATH.SPEC, orient='index'))
            self.index = self.index.astype(str).str.zfill(6)
            return

        date = datetime.today().strftime("%Y%m%d")
        self.log = f'Begin [Market Spec Fetch] @{date}'

        market = concat([self.fetchMarketCap(date, 'KOSPI'), self.fetchMarketCap(date, 'KOSDAQ')])
        market = market[
            (market.index.isin(self.fetchIpoList().index)) &
            (~market['shares'].isna())
        ]
        market = market[market['marketCap'] >= market['marketCap'].median()]

        objs = []
        for n, ticker in enumerate(market.index):
            try:
                xml = self.fetchXml(ticker)
                obj = self.fetchOverview(xml)
                obj.name = ticker
                A, Q = self.fetchStatement(xml)
                if A.empty or Q.empty:
                    objs.append(obj)
                    continue

                Aa, Qq = self.customizeStatement(A), self.customizeStatement(Q)
                obj['trailingRevenue'] = Qq.iloc[-1]['trailingRevenue']
                obj['trailingEps'] = Qq.iloc[-1]['trailingEps']
                obj['trailingProfitRate'] = Qq.iloc[-1]['trailingProfitRate']
                obj['averageRevenueGrowth_A'] = Aa['revenueGrowth'].mean()
                obj['averageProfitGrowth_A'] = Aa['profitGrowth'].mean()
                obj['averageEpsGrowth_A'] = Aa['epsGrowth'].mean()
                obj['RevenueGrowth_A'] = Aa.iloc[-1]['revenueGrowth']
                obj['RevenueGrowth_Q'] = Qq.iloc[-1]['revenueGrowth']
                obj['ProfitGrowth_A'] = Aa.iloc[-1]['profitGrowth']
                obj['ProfitGrowth_Q'] = Qq.iloc[-1]['profitGrowth']
                obj['EpsGrowth_A'] = Aa.iloc[-1]['epsGrowth']
                obj['EpsGrowth_Q'] = Qq.iloc[-1]['epsGrowth']

                dividend = Aa['배당수익률(%)'].dropna()
                obj['fiscalDividendYield'] = dividend.values[-1] if not dividend.empty else nan

                debt = Aa['부채비율(%)'].dropna()
                obj['fiscalDebtRatio'] = debt.values[-1] if not debt.empty else nan
                objs.append(obj)
            except Exception as reason:
                self.log = f'... Failed to fetch: {ticker} / {reason}'

        super().__init__(concat(objs, axis=1).T)
        for col in self:
            self[col] = round(self[col], 4 if col == 'beta' else 2)

        self.log = f'End [Market Spec Fetch] {len(self)} Stocks / Elapsed: {time() - stime:.2f}s'
        return

    @property
    def log(self) -> str:
        return "\n".join(self._log)

    @log.setter
    def log(self, log: str):
        self._log.append(log)

    @classmethod
    def _format(cls, num) -> Union[int, float]:
        if not num:
            return nan
        try:
            return float(num) if "." in num else int(num)
        except ValueError:
            num = "".join([c for c in num if c.isdigit() or c in [".", "-"]])
            if not num or num == "." or num == "-":
                return nan
            return float(num) if "." in num else int(num)

    @classmethod
    def fetchMarketCap(cls, date: str, market:str='ALL') -> DataFrame:
        try:
            df = get_market_cap_by_ticker(date=date, market=market, alternative=True) \
                .rename(columns=CAP_LABEL)
            df.index.name = 'ticker'
            return df
        except (KeyError, RecursionError, JSONDecodeError, SSLError):
            return DataFrame(columns=list(CAP_LABEL.values()))

    @classmethod
    def fetchIpoList(cls) -> DataFrame:
        _url = 'http://kind.krx.co.kr/corpgeneral/corpList.do?method=download'
        try:
            resp = StringIO(get(_url).text)
            df = read_html(io=resp, encoding='euc-kr')[0][IPO_LABEL.keys()] \
                .rename(columns=IPO_LABEL) \
                .set_index(keys='ticker')
            df.index = df.index.astype(str).str.zfill(6)
            return df
        except (KeyError, RecursionError, JSONDecodeError, SSLError):
            return DataFrame(columns=list(IPO_LABEL.values()))

    @classmethod
    def fetchXml(cls, ticker: str, debug: bool = False) -> Union[str, Element]:
        resp = get(url=f"http://cdn.fnguide.com/SVO2/xml/Snapshot_all/{ticker}.xml")
        resp.encoding = 'euc-kr'
        text = resp.text.replace("<![CDATA[", "").replace("]]>", "")
        text = sub(r'<business_summary>.*?</business_summary>', '', text, flags=DOTALL)
        return text if debug else fromstring(text)

    @classmethod
    def fetchOverview(cls, xml: Element) -> Series:
        data = {}
        for key, tag in OVERVIEW_TAG.items():
            ftag = xml.find(tag)
            data[key] = None if ftag is None else ftag.text
        return Series(data=data).apply(cls._format)

    @classmethod
    def fetchStatement(cls, ticker_or_xml: Union[str, Element]) -> Tuple[DataFrame, DataFrame]:
        xml = cls.fetchXml(ticker_or_xml) if isinstance(ticker_or_xml, str) else ticker_or_xml

        def _statement(tag: str) -> DataFrame:
            obj = xml.find(tag)
            if obj is None:
                return DataFrame()
            columns = [val.text for val in obj.findall('field')]
            index, data = [], []
            for record in obj.findall('record'):
                index.append(record.find('date').text)
                data.append([val.text for val in record.findall('value')])
            df = DataFrame(index=index, columns=columns, data=data)
            return df.map(cls._format)

        B_A = _statement(STATEMENT_TAG['separateAnnual'])
        D_A = _statement(STATEMENT_TAG['consolidateAnnual'])
        if B_A.empty or D_A.empty:
            return DataFrame(), DataFrame()
        if B_A.count().sum() > D_A.count().sum():
            A = B_A
            Q = _statement(STATEMENT_TAG['separateQuarter'])
        else:
            A = D_A
            Q = _statement(STATEMENT_TAG['consolidateQuarter'])
        return A, Q

    @classmethod
    def customizeStatement(cls, statement: DataFrame, include_estimated: bool = False) -> DataFrame:
        st = statement.copy()
        if not include_estimated:
            st = st[~st.index.str.endswith('(E)')].copy()
        if st[st.index.str.endswith('(P)')].count().sum() == 0:
            st = st[~st.index.str.endswith('(P)')].copy()
        else:
            st.index = st.index.str.replace(r'\(P\)', '', regex=True)
        st['trailingRevenue'] = st[st.columns[0]].rolling(window=4, min_periods=1).sum()
        st['trailingProfit'] = st['영업이익(억원)'].rolling(window=4, min_periods=1).sum()
        st['trailingEps'] = st['EPS(원)'].rolling(window=4, min_periods=1).sum()
        st['trailingProfitRate'] = st['trailingProfit'] / st['trailingRevenue'] * 100
        st['revenueGrowth'] = 100 * (st[st.columns[0]].diff() / st[st.columns[0]].abs().shift(1))
        st['profitGrowth'] = 100 * (st['영업이익(억원)'].diff() / st['영업이익(억원)'].abs().shift(1))
        st['epsGrowth'] = 100 * (st['EPS(원)'].diff() / st['EPS(원)'].abs().shift(1))
        return st


if __name__ == "__main__":
    marketSpec = MarketSpec(True)
    # print(marketSpec)
    print(marketSpec.log)


Begin [Market Spec Fetch] @20250214
End [Market Spec Fetch] 1318 Stocks / Elapsed: 623.77s


In [18]:
# 별도: 033100, 058470
# 연결: 005930, 000660, 005380, 316140
# Columns:
# ['매출액(억원)', '영업이익(억원)', '영업이익(발표기준)', '당기순이익(억원)', '  지배주주순이익(억원)',
#  '비지배주주순이익(억원)', '자산총계(억원)', '부채총계(억원)', '자본총계(억원)', '  지배주주지분(억원)',
#  '비지배주주지분(억원)', '자본금(억원)', '부채비율(%)', '유보율(%)', '영업이익률(%)',
#  '지배주주순이익률(%)', 'ROA(%)', 'ROE(%)', 'EPS(원)', 'BPS(원)', 'DPS(원)',
#  'PER(배)', 'PBR(배)', '발행주식수(천주)', '배당수익률(%)']
xml = MarketSpec.fetchXml('316140', debug=False)
# xml
ovv = MarketSpec.fetchOverview(xml)
a, q = MarketSpec.fetchStatement(xml)
Aa, Qq = MarketSpec.customizeStatement(a), MarketSpec.customizeStatement(q)

# ovv
# a
# Aa
# Qq

'<?xml version="1.0" encoding="euc-kr"?>\r\n<xml>\r\n<price>\r\n<date>2025/02/12</date>\r\n<close_val>16,850</close_val>\r\n<bef_close_val_calc>450</bef_close_val_calc>\r\n<high52week>17,200</high52week>\r\n<low52week>13,170</low52week>\r\n<change_1month>8.71</change_1month>\r\n<change_3month>4.59</change_3month>\r\n<change_6month>14.24</change_6month>\r\n<change_12month>15.33</change_12month>\r\n<change_36month>11.22</change_36month>\r\n<market_sum>125,127</market_sum>\r\n<listed_stock_1>742,591,501</listed_stock_1>\r\n<listed_stock_2>0</listed_stock_2>\r\n<deal_cnt>3,196,830</deal_cnt>\r\n<deal_amt>539</deal_amt>\r\n<frgn_rate>46.19</frgn_rate>\r\n<beta>0.74658</beta>\r\n<face_value>5,000</face_value>\r\n<ff_sher>654,893,082</ff_sher>\r\n<ff_sher_rt>88.19</ff_sher_rt>\r\n<mkt_cap_1>125,127</mkt_cap_1>\r\n<face_value_unit>원</face_value_unit>\r\n<face_value_chg_dt></face_value_chg_dt>\r\n</price>\r\n<credit_grade>\r\n<record>\r\n<year>2023</year>\r\n<grade>AAA</grade>\r\n</record>\r\n<

In [4]:
# marketSpec
marketSpec.to_json(orient='index').replace('nan', '')

'{"005930":{"high52":87800.0,"low52":49900.0,"beta":1.2506,"floatShares":75.73,"estPrice":73520.0,"estEps":4328.0,"trailingRevenue":3008709.3399999999,"trailingEps":4950.0,"trailingProfitRate":10.88,"averageRevenueGrowth_A":6.16,"averageProfitGrowth_A":74.11,"averageEpsGrowth_A":33.99,"RevenueGrowth_A":16.2,"RevenueGrowth_Q":-4.19,"ProfitGrowth_A":398.34,"ProfitGrowth_Q":-29.3,"EpsGrowth_A":132.29,"EpsGrowth_Q":-22.57,"fiscalDividendYield":1.84,"fiscalDebtRatio":25.36},"000660":{"high52":241000.0,"low52":146800.0,"beta":1.7359,"floatShares":74.08,"estPrice":272880.0,"estEps":33900.0,"trailingRevenue":661929.6,"trailingEps":27183.0,"trailingProfitRate":35.45,"averageRevenueGrowth_A":26.44,"averageProfitGrowth_A":75.37,"averageEpsGrowth_A":-5.85,"RevenueGrowth_A":102.02,"RevenueGrowth_Q":12.48,"ProfitGrowth_A":403.58,"ProfitGrowth_Q":14.98,"EpsGrowth_A":317.16,"EpsGrowth_Q":39.17,"fiscalDividendYield":0.85,"fiscalDebtRatio":87.52},"207940":{"high52":1174000.0,"low52":727000.0,"beta":0.48

## build

### service

#### baseline.py

In [None]:
from pandas import (
    DataFrame,
    read_json
)


class Baseline(DataFrame):
    def __init__(self, update:bool=True):
        if not update:
            super().__init__(read_json(PATH.BASELINE, orient='index'))
            return
        super().__init__()
        return




##### marketmap.py

In [None]:
class MarketMap(DataFrame):
    pass

##### bubble.py

In [None]:
class MarketBubble(DataFrame):
    pass

### market.py

In [None]:
pass

### cache.py

In [None]:
pass

# Experiment

In [None]:
# xml = MarketSpec.fetchXml('088980', True)
xml = MarketSpec.fetchXml('088980')
# MarketSpec.fetchOverview(xml)
a, q = MarketSpec.fetchStatement(xml, False)
# print(xml)
q

Empty DataFrame
Columns: []
Index: []


IndexError: single positional indexer is out-of-bounds