<a href="https://colab.research.google.com/github/labwons/pages/blob/main/datacenter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pykrx yfinance



In [None]:
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from lxml import html
from pandas import DataFrame, Series, DatetimeIndex, DateOffset
from pytz import timezone
from pykrx import stock
from requests.exceptions import JSONDecodeError, SSLError
from typing import Dict, Iterable, List, Union
from yfinance import Ticker
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET
import requests, json, re, time, os, smtplib, io, warnings
warnings.filterwarnings("ignore")

# COMMON

## path.py

In [None]:
class PATH:
    try:
        ROOT = os.path.join(os.path.dirname(__file__), rf"../../")
    except NameError:
        ROOT = 'https://raw.githubusercontent.com/labwons/pages/main/'

    GROUP = os.path.join(ROOT, '_data/json/group/group.json')
    STATE = os.path.join(ROOT, '_data/json/group/state.json')
    PRICE = os.path.join(ROOT, '_data/json/group/price.json')
    SPECS = os.path.join(ROOT, '_data/json/group/specs.json')
    INDEX = os.path.join(ROOT, '_data/json/macro/index.json')


## calendar.py

In [None]:
class Calendar(object):

    # Set Current Time-Zone
    NOW = datetime.now(timezone('Asia/Seoul'))

    # Time Span
    SPAN = []

    # Display Format
    FORMAT = "%Y-%m-%d"

    def __init__(self):

        self.__mem__ = {}

        # Fetching Time-span of the Market
        # looking for 5year market data. reference point is executive datetime(@NOW).
        # resourced from pykrx(; NAVER, KRX) or yfinance(; Yahoo Finance).
        # pykrx is prior to yfinance (incase of server block).
        # datetime interval (or frequency) is 1 day, weekend and holiday not included(automatic)
        _start = self.NOW - timedelta(days=365 * 5)
        _basis = stock.get_market_ohlcv_by_date(
            fromdate=_start.strftime("%Y%m%d"),
            todate=self.NOW.strftime("%Y%m%d"),
            ticker='005930',
            freq='d',
            adjusted=True,
            name_display=False
        )
        if _basis.empty:
            _basis = Ticker('005930.KS').history(
                start=_start.date(),
                end=self.NOW.date(),
                interval='1d'
            )
        else:
            if not self.NOW.strftime("%A") in ['Saturday', 'Sunday']:
                if 930 <= int(self.NOW.strftime("%H%M")) < 1531:
                    _basis = _basis.iloc[:-1]
        self.SPAN = _basis.index.date
        return

    @property
    def format(self) -> str:
        return self.FORMAT

    @format.setter
    def format(self, _format:str):
        self.FORMAT = _format

    def __contains__(self, date:Union[str, datetime, datetime.date]) -> bool:
        if isinstance(date, str):
            if "-" in date:
                date = datetime.strptime(date, "%Y-%m-%d")
            elif "." in date:
                date = datetime.strptime(date, "%Y.%m.%d")
            else:
                date = datetime.strptime(date, "%Y%m%d")
        if isinstance(date, datetime):
            date = date.date()
        return date in self.SPAN

    def __iter__(self) -> iter:
        for interval in ('D-1', 'W-1', 'M-1', 'M-3', 'M-6', 'Y-1'):
            yield interval, self[interval]

    def __len__(self) -> int:
        return len(self.SPAN)

    def __getitem__(self, n_or_interval:Union[int, str]) -> datetime.date:
        if isinstance(n_or_interval, int):
            return self.SPAN[n_or_interval]
        if not '-' in n_or_interval:
            raise KeyError(f'Wrong interval format: {n_or_interval}')
        if n_or_interval in self.__mem__:
            return self.__mem__[n_or_interval]
        interval = n_or_interval.lower()
        if interval.startswith('d'):
            key = 'days'
        elif interval.startswith('w'):
            key = 'weeks'
        elif interval.startswith('m'):
            key = 'months'
        elif interval.startswith('y'):
            key = 'years'
        else:
            raise KeyError(f'Wrong interval format: {n_or_interval}')

        offset = (self[-1] - DateOffset(**{key: int(interval[2:])})).date()
        while not offset in self:
            offset = (offset - DateOffset(days=1)).date()
        self.__mem__[n_or_interval] = offset
        return offset

    def __str__(self) -> str:
        return self[-1].strftime(self.format)


# Alias Override
Calendar = Calendar()

## log.py

In [None]:
class classproperty:
    def __init__(self, fget):
        self.fget = fget
    def __get__(self, instance, owner):
        return self.fget(owner)

class Log:

    SENDER:str = 'snob.labwons@gmail.com'
    RECEIVER:str = 'jhlee_0319@naver.com'

    active: bool = True
    text:str = ''

    @classmethod
    def append(cls, message:str):
        if cls.active:
            cls.text += message
        return

    @classmethod
    def send(cls):
        message = MIMEMultipart()
        message['Subject'] = f"{Calendar} Log"
        message['From'] = cls.SENDER
        message['To'] = cls.RECEIVER
        message.attach(MIMEText(cls.text))
        try:
            with smtplib.SMTP('smtp.gmail.com', 587) as server:
                server.ehlo
                server.starttls()
                server.login(cls.SENDER, "puiz yxql tnoe ivaa")
                server.send_message(message)
        except Exception as e:
            return
        return

    @classproperty
    def failcount(cls) -> int:
        return cls.text.count('Fail')

## web.py

In [None]:
class Web:

    __req__:dict = {}
    __tbl__:dict = {}

    @classmethod
    def get(cls, url:str, encoding:str='', **kwargs):
        if not url in cls.__req__:
            resp = requests.get(url)
            if not resp.status_code is 200:
                return
            if encoding:
                resp.encoding = encoding
            cls.__req__[url] = resp
        return cls.__req__[url]

    @classmethod
    def text(cls, url:str, **kwargs):
        resp = cls.get(url, **kwargs)
        if resp is None:
            return
        text = resp.text.replace("<![CDATA[", "").replace("]]>", "")
        text = re.sub(r'<business_summary>.*?</business_summary>', '', text, flags=re.DOTALL)
        return text

    @classmethod
    def parser(cls, url:str, parser:str='', **kwargs) -> BeautifulSoup:
        if not parser:
            parser = "xml" if url.endswith('xml') else "lxml"
        return BeautifulSoup(cls.text(url, **kwargs), parser)

    @classmethod
    def tables(cls, url:str, **kwargs) -> List[DataFrame]:
        if url in cls.__tbl__:
            return cls.__tbl__[url]
        df = pd.read_html(io=cls.text(url), **kwargs)
        cls.__tbl__[url] = df
        return df

    @classmethod
    def refresh(cls, url:str, encoding:str=''):
        resp = requests.get(url)
        if resp.status_code != 200:
            return
        cls.__req__[url] = resp
        return cls.__req__[url]

# ticker = '088980'
# Web.text(f"http://cdn.fnguide.com/SVO2/xml/Snapshot_all/{ticker}.xml", encoding='euc-kr')

# GENERALS

## WISE

### core.py

In [None]:
CDSEC = {
    'WI100': '에너지', 'WI110': '화학', 'WI200': '비철금속', 'WI210': '철강',
    'WI220': '건설', 'WI230': '기계', 'WI240': '조선', 'WI250': '상사,자본재',
    'WI260': '운송', 'WI300': '자동차', 'WI310': '화장품,의류',
    'WI320': '호텔,레저', 'WI330': '미디어,교육', 'WI340': '소매(유통)',
    'WI400': '필수소비재', 'WI410': '건강관리', 'WI500': '은행',
    'WI510': '증권', 'WI520': '보험', 'WI600': '소프트웨어',
    'WI610': 'IT하드웨어', 'WI620': '반도체', 'WI630': 'IT가전',
    'WI640': '디스플레이', 'WI700': '통신서비스', 'WI800': '유틸리티'
}
CDCOL = {
    'CMP_CD': 'ticker', 'CMP_KOR': 'name',
    'SEC_CD': 'sectorCode', 'SEC_NM_KOR': 'sectorName',
    'IDX_CD': 'industryCode', 'IDX_NM_KOR': 'industryName',
}

def convertNet2Date(timestamp:str) -> str:
    timestamp = int(re.search(r'\((\d+)\)', timestamp).group(1))
    return pd.to_datetime(timestamp, unit='ms', utc=True) \
            .tz_convert('Asia/Seoul') \
            .date()

def separateMediaAndEducation(df:DataFrame) -> DataFrame:
    df = df.copy()
    df.loc[df["SEC_NM_KOR"] == "커뮤니케이션서비스", "IDX_NM_KOR"] = "미디어"
    df.loc[df["SEC_NM_KOR"] != "커뮤니케이션서비스", "IDX_NM_KOR"] = "교육"
    return df

def separateSwAndITservice(df:DataFrame) -> DataFrame:
    df = df.copy()
    df.loc[df["SEC_NM_KOR"] == 'IT', 'IDX_NM_KOR'] = 'IT서비스'
    return df

### fetch.py

In [None]:
def fetchWiseDate() -> datetime.date:
    _url = 'https://www.wiseindex.com/Index/Index#/G1010.0.Components'
    pattern = re.compile(r"var\s+dt\s*=\s*'(\d{8})'")
    fetched = pattern.search(requests.get(_url).text)
    return datetime.strptime(fetched.group(1), "%Y%m%d").date()

def fetchWiseGroup(code:str, date:str="", countdown:int=5) -> DataFrame:
    _url = 'http://www.wiseindex.com/Index/GetIndexComponets?ceil_yn=0&dt=%s&sec_cd=%s'
    date = fetchWiseDate().strftime("%Y%m%d") if date == "" else date
    resp = requests.get(_url % (date, code))
    try:
        return DataFrame(resp.json()['list'])
    except JSONDecodeError:
        if countdown > 0:
            time.sleep(5)
            return fetchWiseGroup(code, date, countdown - 1)
        return DataFrame()

def fetchWiseSeries(code:str, fromDT:str="", endDT:str="", countdown:int=5) -> Series:
    _url = 'http://www.wiseindex.com/DataCenter/GridData?currentPage=1&endDT=%s&fromDT=%s&index_ids=%s&isEnd=1&itemType=1&perPage=10000&term=1'
    edt = endDT if endDT else fetchWiseDate().strftime("%Y-%m-%d")
    sdt = fromDT if fromDT else '2000-01-01'
    resp = requests.get(_url % (edt, sdt, code))
    try:
        df = DataFrame(resp.json())[["TRD_DT", "IDX1_VAL1"]]
        df["TRD_DT"] = df["TRD_DT"].apply(convertNet2Date)
        return df.rename(columns={"IDX1_VAL1": code, "TRD_DT": "date"}).set_index("date")
    except JSONDecodeError:
        if countdown > 0:
            time.sleep(5)
            return fetchWiseSeries(code, sdt, edt, countdown - 1)
        return Series()

### generic.py

In [None]:
GENTIME = fetchWiseDate()

class Groups(DataFrame):

    def __init__(self, readonly:bool=False):
        if readonly:
            super().__init__(pd.read_json(PATH.GROUP, orient='index'))
            self.index = self.index.astype(str).str.zfill(6)
            self.index.name = 'ticker'
            return

        Log.active = not readonly
        Log.append(f"Fetching WISE Group @Date: {GENTIME}\n")

        objs = []
        for n, (code, name) in enumerate(CDSEC.items()):
            Log.append(f"... ({n + 1} / {len(CDSEC)}) {code} / {name}: ")
            fetch = fetchWiseGroup(code, GENTIME.strftime("%Y%m%d"))
            if fetch.empty:
                Log.append("Failed\n")
                continue
            if code == 'WI330':
                fetch = separateMediaAndEducation(fetch)
            if code == 'WI600':
                fetch = separateSwAndITservice(fetch)
            objs.append(fetch)
            Log.append("Success\n")
        wrap = pd.concat(objs, axis=0, ignore_index=True)[CDCOL.keys()]
        wrap["IDX_NM_KOR"] = wrap["IDX_NM_KOR"].str.replace("WI26 ", "")
        wrap = wrap.rename(columns=CDCOL).set_index("ticker")

        kq = stock.get_index_portfolio_deposit_file('2001')
        lg = stock.get_index_portfolio_deposit_file('2203') \
           + stock.get_index_portfolio_deposit_file('1028')
        kq = [ticker for ticker in kq if ticker in wrap.index]
        lg = [ticker for ticker in lg if ticker in wrap.index]
        wrap.loc[kq, 'name'] = wrap.loc[kq, 'name'] + '*'
        wrap.loc[lg, 'stockSize'] = 'large'
        super().__init__(wrap)
        return

    def dump(self) -> str:
        string = self.to_json(orient='index').replace("nan", "")
        if not PATH.GROUP.startswith('http'):
            with open(PATH.GROUP, 'w') as f:
                f.write(string)
        return string


class Indices(DataFrame):

    def __init__(self, readonly:bool=True):
        df = pd.read_json(PATH.INDEX)
        df['date'] = df['date'].dt.date
        df = df.set_index(keys='date')
        if readonly:
            super().__init__(df)
            return

        df = df.iloc[:-1]
        for col in df:
            latest = Calendar[-1] if col in ["KOSPI", "KOSDAQ"] else GENTIME
            index = df[col].dropna()
            if latest == index.index[-1]:
                continue

            if col in ["KOSPI", "KOSDAQ"]:
                ticker = {'KOSPI':'1001', 'KOSDAQ':'2001'}[col]
                fetch = stock.get_index_ohlcv_by_date(
                    ticker=ticker,
                    fromdate=str(index.index[-1]),
                    todate=str(Calendar[-1]),
                    name_display=False
                ).rename(columns={'종가': col})
                fetch.index = fetch.index.date
            else:
                fetch = fetchWiseSeries(
                    code=col,
                    fromDT=str(index.index[-1]),
                    endDT=GENTIME.strftime("%Y-%m-%d")
                )
            for i in fetch.index:
                df.loc[i, col] = fetch.loc[i, col]
        super().__init__(df)
        return

    def dump(self) -> str:
        string = self.to_json(orient='index').replace("nan", "")
        if not PATH.INDEX.startswith('http'):
            with open(PATH.INDEX, 'w') as f:
                f.write(string)
        return string

## FN

### core.py

In [None]:
def str2num(src:str) -> int or float:
    if not src:
        return np.nan
    try:
        return float(src) if "." in src else int(src)
    except ValueError:
        src = "".join([c for c in src if c.isdigit() or c in [".", "-"]])
        if not src or src == "." or src == "-":
            return None
        if "." in src:
            return float(src)
        return int(src)


### fetch.py

In [None]:
def fetchIpo() -> DataFrame:
    CDIPO = {
        '회사명':'name', '종목코드':'ticker',
        '상장일':'ipo', '주요제품':'products', '결산월':'settlementMonth'
    }
    _url = 'http://kind.krx.co.kr/corpgeneral/corpList.do?method=download'
    try:
        resp = io.StringIO(requests.get(_url).text)
        df = pd.read_html(io=resp, encoding='euc-kr')[0][CDIPO.keys()]
        df = df.rename(columns=CDIPO).set_index(keys='ticker').copy()
        df.index = df.index.astype(str).str.zfill(6)
        return df
    except (KeyError, RecursionError, JSONDecodeError, SSLError):
        return DataFrame(columns=CDIPO.values())

def fetchOverviewSpec(ticker:str) -> Series:
    url = f"http://cdn.fnguide.com/SVO2/xml/Snapshot_all/{ticker}.xml"
    root = ET.fromstring(Web.text(url, encoding='euc-kr'))
    tags = {
        # 'date': 'price/date',
        'high52': 'price/high52week',
        'low52': 'price/low52week',
        'beta': 'price/beta',
        'floatShares': 'price/ff_sher_rt',
        'estPrice': 'consensus/target_price',
        'estEps': 'consensus/eps'
    }
    data = {}
    for key, tag in tags.items():
        elem = root.find(tag)
        data[key] = None if elem is None else elem.text
    return Series(data=data, name=ticker).apply(str2num)

def fetchOverviewStatement(
        ticker:str,
        report:str='consolidated',
        period:str='annual',
        include_estimated:bool=True
    ) -> DataFrame:
    if not (report.lower().startswith('con') or report.lower().startswith('sep')):
        raise KeyError(f'Invalid Argument for @report: {report}')
    if not (period.lower().startswith('a') or period.lower().startswith('q')):
        raise KeyError(f'Invalid Argument for @period: {period}')

    ifrs = 'D' if report.lower().startswith('con') else 'B'
    stamp = 'annual' if period.lower().startswith('a') else 'quarter'
    tag = f'financial_highlight_ifrs_{ifrs}/financial_highlight_{stamp}'

    url = f"http://cdn.fnguide.com/SVO2/xml/Snapshot_all/{ticker}.xml"
    root = ET.fromstring(Web.text(url, encoding='euc-kr'))
    if root.find(tag) is None:
        return DataFrame()
    cols = [val.text for val in root.findall(f'{tag}/field')]
    index, data = [], []
    for elem in root.findall(f'{tag}/record'):
        index.append(elem.find('date').text)
        data.append([val.text for val in elem.findall('value')])
    df = DataFrame(index=index, columns=cols, data=data)
    if not include_estimated:
        df = df[~df.index.str.endswith('(E)')]
    return df.applymap(str2num)


ticker = '005930'
# fetchOverviewSpec(ticker)
# fetchOverviewStatement(ticker, 'con', 'q', False)


### generic.py

In [None]:
class Stat(DataFrame):

    def __init__(self):
        read = pd.read_json(PATH.STATE, orient='index')
        read.index = read.index.astype(str).str.zfill(6)
        super().__init__(read)
        return

    def update(self, tickers:Iterable=None):
        if not tickers:
            tickers = self.index

        for ticker in tickers:
            base = fetchOverviewSpec(ticker)

            ifrs = 'con'
            quarter = fetchOverviewStatement(ticker, ifrs, 'q', False).iloc[1:]
            if quarter.empty:
                self.loc[ticker, base.index] = base.values
                continue
            debt = quarter.iloc[-1]['부채비율(%)']
            if np.isnan(debt):
                ifrs = 'sep'
                quarter = fetchOverviewStatement(ticker, ifrs, 'q', False).iloc[1:]

            base['trailingSales'] = quarter[quarter.columns[0]].sum()
            base['trailingEarning'] = quarter['영업이익(억원)'].sum()
            base['trailingNetIncome'] = quarter['당기순이익(억원)'].sum()
            base['trailingEarningRatio'] = round(quarter['영업이익률(%)'].mean(), 2)
            base['trailingEps'] = quarter['EPS(원)'].sum()

            annual = fetchOverviewStatement(ticker, ifrs, 'a', False)

            base['fiscalSales'] = annual.iloc[-1][annual.columns[0]]
            base['fiscalEarning'] = annual['영업이익(억원)'].sum()
            base['fiscalNetIncome'] = annual['당기순이익(억원)'].sum()
            base['fiscalEarningRatio'] = annual.iloc[-1]['영업이익률(%)']
            base['fiscalEps'] = annual.iloc[-1]['EPS(원)']
            base['fiscalDividends'] = annual.iloc[-1]['배당수익률(%)']
            base['debtRatio'] = quarter.iloc[-1]['부채비율(%)']
            self.loc[ticker] = base
        return

    def dump(self) -> str:
        string = self.to_json(orient='index').replace("nan", "")
        if not PATH.STATE.startswith('http'):
            with open(PATH.STATE, 'w') as f:
                f.write(string)
        return string

# df = Stat()
# df.update(['395400'])
# df.update()
# df.dump()

## KRX

### fetch.py

In [None]:
def fetchKonex() -> list:
    try:
        df = stock.get_market_cap_by_ticker(date=str(Calendar), market='KONEX')
        return df.index.tolist()
    except (KeyError, RecursionError, JSONDecodeError, SSLError):
        return []

def fetchMarketCap() -> DataFrame:
    CDCAP = {
        '종가':'close', '시가총액':'marketCap',
        '거래량':'volume', '거래대금':'amount', '상장주식수':'shares'
    }
    try:
        df = stock.get_market_cap_by_ticker(
            date=str(Calendar),
            market='ALL',
        ).rename(columns=CDCAP)
        df.index.name = 'ticker'
        return df
    except (KeyError, RecursionError, JSONDecodeError, SSLError):
        return DataFrame(columns=CDCAP.values())

def fetchKrxMultiples() -> DataFrame:
    COLS = ['PER', 'PBR', 'DIV']
    try:
        df = stock.get_market_fundamental(
            date=str(Calendar),
            market="ALL",
        )[COLS]
        df.index.name = "ticker"
        return df
    except (KeyError, RecursionError, JSONDecodeError, SSLError):
        return DataFrame(columns=COLS)

def fetchForeignRate() -> Series:
    try:
        df = stock.get_exhaustion_rates_of_foreign_investment(
            date=str(Calendar),
            market='ALL'
        )["지분율"]
        df.index.name, df.name = 'ticker', 'foreignRate'
        return round(df.astype(float), 2)
    except (KeyError, RecursionError, JSONDecodeError, SSLError):
        return DataFrame(columns=["foreignRate"])

def fetchReturns(tickers:Iterable=None) -> DataFrame:
    def _base_return() -> DataFrame:
        _objs = {}
        _base = stock.get_market_ohlcv_by_ticker(date=str(Calendar), market="ALL")['종가']
        for interval, date in Calendar:
            _fetch = stock.get_market_ohlcv_by_ticker(date=str(date), market="ALL")['종가']
            _objs[interval] = round(100 * (_base / _fetch - 1), 2)
        return pd.concat(objs=_objs, axis=1)

    def _update_return(_tickers:Iterable) -> DataFrame:
        fromdate, todate = Calendar['Y-2'].strftime("%Y%m%d"), str(Calendar)
        objs = []
        for ticker in _tickers:
            src = stock.get_market_ohlcv_by_date(ticker=ticker, fromdate=fromdate, todate=todate)['종가']
            obj = {"ticker": ticker}
            for interval, date in Calendar:
                src_copy = src[src.index.date >= date]
                obj[interval] = round(100 * (src_copy.iloc[-1] / src_copy.iloc[0] - 1), 2)
            objs.append(obj)
        return DataFrame(objs).set_index(keys='ticker')

    _shares = pd.concat({dt: stock.get_market_cap_by_ticker(
        date=str(Calendar[dt]),
        market='ALL',
    )['상장주식수'] for dt in ['D-0', 'Y-1']}, axis=1)
    _shares = _shares[~_shares['D-0'].isna()]
    if not tickers is None:
        _shares = _shares[_shares.index.isin(tickers)]

    _normal = _shares[_shares['D-0'] == _shares['Y-1']].index
    _change = _shares[_shares['D-0'] != _shares['Y-1']].index
    _return = _base_return()
    return pd.concat([_return[_return.index.isin(_normal)], _update_return(_change)])

### generic.py

In [None]:
class Price(DataFrame):

    def __init__(self, readonly:bool=True):
        if readonly:
            super().__init__(pd.read_json(PATH.PRICE, orient='index'))
            return

        Log.active = not readonly
        Log.append(f"Fetching Market Specification @Date: {Calendar}\n")

        Log.append(f"... Market Cap: ")
        df1 = fetchMarketCap()
        Log.append("Failed\n" if df1.empty else "Success\n")

        Log.append(f"...Multiples: ")
        df2 = fetchKrxMultiples()
        Log.append("Failed\n" if df2.empty else "Success\n")

        Log.append(f"... Foreign Rate: ")
        df3 = fetchForeignRate()
        Log.append("Failed\n" if df3.empty else "Success\n")

        df = pd.concat([df1, df2, df3], axis=1)
        df = df[(~df.index.isin(fetchKonex())) & (df['volume']> 0)]
        if "marketCap" in df:
            df = df[df['marketCap'] >= df['marketCap'].median()]
        tickers = df.index.values

        Log.append(f"... Returns: ")
        df4 = fetchReturns(tickers)
        df = df.join(df4, how='left')
        Log.append("Failed\n" if df4.empty else "Success\n")

        super().__init__(df)
        return

    def dump(self) -> str:
        string = self.to_json(orient='index').replace("nan", "")
        if not PATH.PRICES.startswith('http'):
            with open(PATH.PRICE, 'w') as f:
                f.write(string)
        return string



# specs = Flux(readonly=False)
# specs.dump()

# SPEC

## core.py

In [None]:
CDWEIGHT = [
    'D-1', 'W-1', 'M-1', 'M-3', 'M-6', 'Y-1',
    'beta', 'trailingPE', 'PBR', 'foreignRate', 'high52PR', 'low52PR',
]

CDMEAN = [
    'trailingEarningRatio', 'fiscalDividends', 'debtRatio'
]

RELTs = Series({
    "088980": "맥쿼리인프라",
    "395400": "SK리츠",
    "365550": "ESR켄달스퀘어리츠",
    "330590": "롯데리츠",
    "348950": "제이알글로벌리츠",
    "293940": "신한알파리츠",
    "432320": "KB스타리츠",
    "094800": "맵스리얼티1",
    "357120": "코람코라이프인프라리츠",
    "448730": "삼성FN리츠",
    "451800": "한화리츠",
    "088260": "이리츠코크렙",
    "334890": "이지스밸류리츠",
    "377190": "디앤디플랫폼리츠",
    "404990": "신한서부티엔디리츠",
    "417310": "코람코더원리츠",
    "400760": "NH올원리츠",
    "396690": "미래에셋글로벌리츠",
    "350520": "이지스레지던스리츠",
    "481850": "신한글로벌액티브리츠"
})

def num2cap(x:int) -> str:
    mod, res = int(x // 10000), int(x % 10000)
    if mod:
        return f"{mod}조 {res}억"
    return f"{res}억"

def ceiling(data:DataFrame, name:str, ticker:str='', ceiling:str='') -> dict:
    total = data['size'].sum()
    weight = data['size'] / total
    ticker = ticker if ticker else data.iloc[0]['industryCode']
    ceiling = ceiling if ceiling else data.iloc[0]['sectorName']
    obj = {
        'ticker': ticker,
        'name': name,
        'marketCap': num2cap(total),
        'size': total,
        'volume': data['volume'].sum(),
        'ceiling': ceiling,
        'meta': name + '<br>시가총액: ' + num2cap(total) + '원<br>'
    }
    for col in CDWEIGHT:
        obj[col] = round((data[col] * weight).sum(), 2)
    for col in CDMEAN:
        obj[col] = round(data[col].mean(), 2)
    return obj

def grouping(*args) -> DataFrame:
    return DataFrame([ceiling(group, name) for (name, ), group in args]) \
           .set_index(keys='ticker')

## color.py

In [None]:
from typing import List, Union


VALUE3 = [
    '#F63538', # R246 G53 B56
    '#BF4045', # R191 G64 B69
    '#8B444E', # R139 G68 B78
    '#414554', # R65 G69 B84
    '#35764E', # R53 G118 B78
    '#2F9E4F', # R47 G158 B79
    '#30CC5A'  # R48 G204 B90
]

VALUE1 = [
    '#FF0D0D', # BAD
    '#FF4E11',
    '#FF8E15',
    '#A6A6A6',
    '#FAB733',
    '#ACB334',
    '#69B34C'  # GOOD
]

VALUE2 = [
    '#1861A8', # R24 G97 B168
    '#228BE6', # R34 G139 B230
    '#74C0FC', # R116 G192 B252
    '#A6A6A6', # R168 G168 B168
    '#FF8787', # R255 G135 B135
    '#F03E3E', # R240 G62 B62
    '#C92A2A'  # R201 G42 B42
]

BOUND = {
    'Y-1': [-30, -20, -10, 0, 10, 20, 30],
    'M-6': [-24, -16, -8, 0, 8, 16, 24],
    'M-3': [-18, -12, -6, 0, 6, 12, 18],
    'M-1': [-10, -6.7, -3.3, 0, 3.3, 6.7, 10],
    'W-1': [-6, -4, -2, 0, 2, 4, 6],
    'D-1': [-3, -2, -1, 0, 1, 2, 3],
    'fiscalDividends': [0, 0, 0, 0, 1.5, 3.0, 4.5],
    'high52PR': [-30, -20, -10, 0, 2, 4, 6],
    # 'low52PR': [-30, -20, -10, 0, 10, 20, 30]
}

HEX2RGB  = lambda x: (int(x[1:3], 16), int(x[3:5], 16), int(x[5:], 16))
DOT2LINE = lambda x, x1, y1, x2, y2: ( (y2 - y1) / (x2 - x1) ) * (x - x1) + y1

def bounding(data:Series) -> list:
    if str(data.name) in BOUND:
        return BOUND[str(data.name)]
    align = data.dropna().sort_values(ascending=True)

    # 개수 기반
    length = len(data)
    index = [length * n // 8 for n in range(1, 8)]
    return align.iloc[index].tolist()

    # 값 기반
    # return [data.max() * n / 8 for n in range(1, 8)]

def paint(data:Series, scale:list) -> Series:
    bound = bounding(data)
    def paint(val:Union[int, float]) -> str:
        if (not val) or (str(val) == 'nan'):
            return scale[3]
        elif val <= bound[0]:
            return scale[0]
        elif val > bound[-1]:
            return scale[-1]
        n = 0
        while n < len(bound) - 1:
            if bound[n] < val <= bound[n + 1]:
                break
            n += 1
        r1, g1, b1 = HEX2RGB(scale[n])
        r2, g2, b2 = HEX2RGB(scale[n + 1])
        r = DOT2LINE(val, bound[n], r1, bound[n + 1], r2)
        g = DOT2LINE(val, bound[n], g1, bound[n + 1], g2)
        b = DOT2LINE(val, bound[n], b1, bound[n + 1], b2)
        return f'#{hex(int(r))[2:]}{hex(int(g))[2:]}{hex(int(b))[2:]}'.upper()
    color = data.apply(paint)
    color.iloc[-1] = '#C8C8C8'
    return color


# def paint(data:DataFrame):
#     colored = DataFrame(index=data.index)
#     for col in MAP_KEYS:
#         if col in ['PER', 'PBR']:
#             continue
#         color = data[col].apply(color, args=(col,))
#         color.name = f'{col}-C'
#         colored = colored.join(color.astype(str), how='left')

#     for f in ['PBR', 'PER']:
#         scale = SCALE[::-1].copy()
#         value = data[data[f] != 0][f].dropna().sort_values(ascending=False)

#         v = value.tolist()
#         limit = [v[int(len(value) / 7) * i] for i in range(len(scale))] + [v[-1]]
#         _color = pd.cut(value, bins=limit[::-1], labels=scale, right=True)
#         _color.name = f"{f}-C"
#         colored = colored.join(_color.astype(str), how='left').fillna(scale[-1])
#         colored = colored.replace('nan', scale[-1])
#     colored = colored.fillna(SCALE[3])
#     for col in colored:
#         colored.at[colored.index[-1], col] = "#C8C8C8"
#     return data.join(colored, how='left')

## generic.py

In [None]:
class Basis(DataFrame):

    def __init__(self, readonly:bool=False):
        bs = Price(readonly=readonly) \
             .join(Groups(readonly=True))

        # Preferred stocks, suspended stocks, and other stocks
        # that are not classified due to various reasons are not included.
        # Real estate (REITs) stocks are not provided by default,
        # So they are included at our discretion.
        dump = bs[bs['sectorCode'].isna() | bs['industryCode'].isna()]
        dump = dump[~dump.index.isin(RELTs.index)]
        bs = bs[~bs.index.isin(dump.index)]
        bs.loc[RELTs.index, 'name'] = RELTs.values
        bs.loc[RELTs.index, 'sectorCode'] = 'G99'
        bs.loc[RELTs.index, 'industryCode'] = 'WI999'
        bs.loc[RELTs.index, ['sectorName', 'industryName']] = "부동산"

        # State update
        # fifty-two week price is daily updated (close-wise)
        # new tickers are also updated daily (full-state)
        st = Stat()
        new = [t for t in bs.index if not t in st.index]
        if new:
            st.update(new)
        st['close'] = [bs.loc[ticker, 'close'] for ticker in st.index]
        st['high52'] = st.apply(lambda row: max(row['high52'], row['close']), axis=1)
        st['low52'] = st.apply(lambda row: min(row['low52'], row['close']), axis=1)
        st.drop(columns=["close"], inplace=True)
        st.dump()
        bs = bs.join(st)

        # Refine
        # Size is calculated by dividing the market capitalization
        # by 100 million KRW. The keyword 'PR' stands for 'Price Ratio,',
        # which represents the rate of change compared to the reference price.
        # 'PE' stands for 'Price Earnings,' indicating the price-to-earnings
        # ratio. 'PS' stands for 'Price Sales,' indicating the price-to-sales
        # ratio. 'Estimated' refers to estimated values, while 'trailing'
        # refers to values based on the most recent four consecutive quarters.
        # 'Fiscal' refers to values confirmed for the last fiscal year.
        bs['size'] = bs['marketCap'] / 100000000
        bs['marketCap'] = bs['size'].apply(num2cap)
        bs['high52PR'] = round(100 * (bs['close'] / bs['high52'] - 1), 2)
        bs['low52PR'] = round(100 * (bs['close'] / bs['low52'] - 1), 2)
        bs['estimatedPR'] = round(100 * (bs['close'] / bs['estPrice'] - 1), 2)
        bs['estimatedPE'] = round(bs['close'] / bs['estEps'], 2)
        bs['trailingPS'] = round(bs['size'] / bs['trailingSales'], 2)
        bs['trailingPE'] = round(bs['close'] / bs['trailingEps'], 2)
        bs['fiscalPE'] = round(bs['close'] / bs['fiscalEps'], 2)
        bs = bs[[
            'name',  'close', 'marketCap', 'foreignRate', 'volume',
            'sectorCode', 'industryCode', 'sectorName', 'industryName',
            'D-1', 'W-1', 'M-1', 'M-3', 'M-6', 'Y-1',
            'high52PR', 'low52PR', 'estimatedPR',
            'stockSize', 'beta', 'floatShares',
            'trailingPS', 'trailingPE', 'fiscalPE', 'estimatedPE', 'PBR',
            'trailingEarningRatio', 'fiscalEarningRatio', 'fiscalDividends',
            'trailingSales', 'trailingEarning', 'trailingNetIncome',
            'fiscalSales', 'fiscalEarning', 'fiscalNetIncome',
            'debtRatio', 'size'
        ]]
        super().__init__(bs)
        return

    def dump(self) -> str:
        string = self.to_json(orient='index').replace("nan", "")
        if not PATH.SPECS.startswith('http'):
            with open(PATH.SPECS, 'w') as f:
                f.write(string)
        return string


class MarketMap(DataFrame):

    WS, NS = [], []
    def __init__(self, basis:DataFrame=DataFrame()):
        if basis.empty:
            basis = pd.read_json(PATH.SPECS, orient='index')
            basis.index = basis.index.astype(str).str.zfill(6)
        basis.loc[basis['size'] > 10000, 'stockSize'] = 'large'
        basis = basis[basis['volume'] > 0]
        basis = pd.concat([df for _, df in basis.groupby(by='industryCode')])

        large_ws = basis[basis['stockSize'] == 'large']
        large_ws['ceiling'] = large_ws['industryName']
        large_ws['meta'] = large_ws['name'] + '(' + large_ws.index + ')<br>' \
                         + '시가총액: ' + large_ws['marketCap'] + '원<br>' \
                         + '종가: ' + large_ws['close'].apply(lambda x: f"{x:,d}") + '원<br>'
        section = '대형주'
        WS = basis.index.to_list()
        ii, si = large_ws.groupby(by=['industryName']), large_ws.groupby(by=['sectorName'])
        ind_ws, sec_ws = grouping(*ii), grouping(*si)
        ind_ws = ind_ws[ind_ws['name'] != ind_ws['ceiling']]
        sec_ws['ceiling'] = section
        ind_i_ws = [f'{i}WS' for i in ind_ws.index]
        sec_i_ws = [f'{sec.iloc[0]["sectorCode"]}WS' for (_, ), sec in si]
        ind_ws.index, sec_ws.index = ind_i_ws, sec_i_ws
        top_ws = ceiling(large_ws, section, 'TOPWS', '')
        top_ws['ceiling'] = ''
        top_ws = DataFrame(index=['TOPWS'], data=top_ws)


        section = '대형주(삼성전자 제외)'
        large_ns = large_ws[large_ws.index != '005930']
        NS = large_ns.index.to_list()
        ii, si = large_ns.groupby(by=['industryName']), large_ns.groupby(by=['sectorName'])
        ind_ns, sec_ns = grouping(*ii), grouping(*si)
        ind_ns = ind_ns[ind_ns['name'] != ind_ns['ceiling']]
        sec_ns['ceiling'] = section
        ind_i_ns = [f'{i}NS' for i in ind_ns.index]
        sec_i_ns = [f'{sec.iloc[0]["sectorCode"]}NS' for (_, ), sec in si]
        ind_ns.index, sec_ns.index = ind_i_ns, sec_i_ns
        top_ns = ceiling(large_ns, section, 'TOPNS', '')
        top_ns['ceiling'] = ''
        top_ns = DataFrame(index=['TOPNS'], data=top_ns)

        super().__init__(
            pd.concat([
                large_ws, ind_ws, sec_ws, ind_ns, sec_ns, top_ws, top_ns
            ])[[
                'name', 'close', 'marketCap', 'size',
                'foreignRate', 'volume', 'ceiling',
                'D-1', 'W-1', 'M-1', 'M-3', 'M-6', 'Y-1',
                'high52PR', 'low52PR', 'estimatedPR',
                'beta', 'trailingPS', 'trailingPE',
                'fiscalPE', 'estimatedPE', 'PBR', 'trailingEarningRatio',
                'fiscalEarningRatio', 'fiscalDividends', 'trailingSales',
                'trailingEarning', 'trailingNetIncome', 'fiscalSales', 'fiscalEarning',
                'fiscalNetIncome', 'debtRatio', 'meta'
            ]]
        )

        self.WS = WS + ind_i_ws + sec_i_ws + ['TOPWS']
        self.NS = NS + ind_i_ns + sec_i_ns + ['TOPNS']
        return


basis = Basis(True)
# basis.dump()
# basis

marketMap = MarketMap(basis)
# marketMap

# PRODUCT TEST

## treemap

In [None]:
import plotly.graph_objects as go

typ = 'WS'
key = 'high52PR'
unit = '%'
data = marketMap[marketMap.index.isin(getattr(marketMap, typ))]

trace = go.Treemap(
    branchvalues="total",
    labels=data['name'],
    parents=data['ceiling'],
    values=data['size'],
    text=data[key],
    textposition='middle center',
    textfont={
      'family':'NanumGothic, Nanum Gothic, Open Sans, sans-serif',
      'color':'#ffffff'
    },
    texttemplate='%{label}<br>%{text:.2f}' + unit,
    meta=data['meta'],
    hovertemplate='%{meta}' + key + ': %{text}' + unit + '<extra></extra>',
    hoverlabel={
      'font':{
        'family':'NanumGothic, Nanum Gothic, Open Sans, sans-serif',
        'color':'#ffffff'
      }
    },
    opacity=0.9,
    marker={
      'colors':paint(data[key], VALUE2)
    },
)
fig = go.Figure(data=trace, layout=go.Layout(margin={'l':10, 'r': 10, 't':10, 'b':25}))
fig.show()

## bar

In [None]:
import plotly.graph_objects as go



# ANALYSIS

In [None]:
def bounding(data:Series) -> list:
    if str(data.name) in BOUND:
        return BOUND[str(data.name)]
    align = data.dropna().sort_values(ascending=True)

    # 개수 기반
    # length = len(data)
    # index = [length * n // 8 for n in range(1, 8)]
    # return align.iloc[index].tolist()

    # 값 기반
    return [data.max() * n / 8 for n in range(1, 8)]


import plotly.graph_objects as go
import plotly.figure_factory as ff

def gaussian(x, mu, sigma):
    return (1 / (sigma * np.sqrt(2 * np.pi))) * np.exp(-0.5 * ((x - mu) / sigma) ** 2)

data = marketMap['high52PR']

tr = go.Scatter(
    x=data,
    y=gaussian(data, data.mean(), data.std()),
    mode='markers',
    meta=data,
    hovertemplate='%{meta}<extra></extra>'
)
fig = go.Figure()
fig.add_trace(tr)
partitions = bounding(data)
print(partitions)
for partit in partitions:
    fig.add_vline(x=partit, line_dash='dot', line_width=1.0)

# 3. 레이아웃 설정
fig.update_layout(
    title='정규 분포 히스토그램 및 밀도 추정',
    xaxis_title='값',
    yaxis_title='빈도',
    template='plotly_white'
)

# 4. 그래프 표시
fig.show()

[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
