In [1]:
import datetime as dt
import numpy as np

import pandas as pd
from sklearn.linear_model import LinearRegression

In [2]:
# PPS
info_df = pd.read_csv("./data/dynamic/pps_info_df.csv", index_col=0)
ohlcv_df = pd.read_csv("./data/dynamic/pps_ohlcv_df.csv", index_col=0, low_memory=False)
fundamental_df = pd.read_csv("./data/dynamic/pps_fundamental_df.csv", index_col=0)
sector_df = pd.read_csv("./data/dynamic/pps_sector_df.csv", index_col=0)

In [3]:
"""
Preprocessor
    - OHLCV_PREPROCESSOR
"""


class OHLCV_PREPROCESSOR:
    def __init__(self, ohlcv_df) -> None:
        self.ohlcv_df = ohlcv_df.copy()
        self._format_date()

    def _format_date(self):
        ohlcv_df = self.ohlcv_df
        ohlcv_df["Date"] = pd.to_datetime(ohlcv_df["Date"])
        self.ohlcv_df = ohlcv_df
        return None

    def get_date_filtered_ohlcv_df(self, start_date, end_date):
        ohlcv_df = self.ohlcv_df
        filtered_ohlcv_df = ohlcv_df[
            (ohlcv_df["Date"] > start_date) & (ohlcv_df["Date"] < end_date)
        ]
        filtered_ohlcv_df = self._filter_na(filtered_ohlcv_df)
        filtered_ohlcv_df = self._filer_zero(filtered_ohlcv_df, "Volume")
        filtered_ohlcv_df = self._filter_cnt(filtered_ohlcv_df)
        return filtered_ohlcv_df

    @staticmethod
    def _filter_cnt(df):
        df_groupby = df.groupby("StockCode")
        df_groupby_cnt = df_groupby.count()["Date"]
        cnt_mode = df_groupby_cnt.mode().values[0]
        filtered_StockCodes = df_groupby_cnt[df_groupby_cnt == cnt_mode].index.to_list()

        filtered_cnt_df = df[df["StockCode"].isin(filtered_StockCodes)]
        return filtered_cnt_df

    @staticmethod
    def _filer_zero(df, column):
        filtered_df = df[df[column].astype(int) != 0]
        return filtered_df

    @staticmethod
    def _filter_na(df):
        filtered_df = df.dropna()
        return filtered_df

In [4]:
"""
PROCESSOR
    - INFO_PROCESSOR
    - FUNDAMENTAL_PROCESSOR
    - SECTOR_PROCESSOR
    - NETPROFIT_PROCESSOR
    - COEFFICIENT_PROCESSOR    
"""


class INFO_PROCESSOR:
    def __init__(self, info_df):
        self.info_df = info_df.copy()
        self._setindex_StockCode()

    def _setindex_StockCode(self):
        info_df = self.info_df
        info_df.set_index("StockCode", inplace=True)
        self.info_df = info_df
        return None

    def get_StockName_dict(self):
        """
        종목 이름
        """
        info_df = self.info_df
        StockName_dict = info_df["StockName"].to_dict()
        return StockName_dict

    def get_VolumeTurnOverRatio_dict(self):
        """
        거래량 회전률

        :과거의 검증으로 볼 때 50%를 기준으로 130% 이상이 단기경계, 20% 수준이면 바닥으로 본다고 한다.
        """
        info_df = self.info_df
        VolumeTurnOverRatio_dict = info_df["VolumeTurnOverRatio"].to_dict()
        return VolumeTurnOverRatio_dict

    def get_TotalShares_dict(self):
        """
        상장주식수
        """
        info_df = self.info_df
        TotalShares_dict = info_df["TotalShare"].to_dict()
        return TotalShares_dict


class FUNDMANENTAL_PROCESSOR:
    def __init__(self, fundamental_df):
        self.fundamental_df = fundamental_df.copy()
        self._format_StockCode()
        self._setindex_StockCode()

    def _format_StockCode(self):
        fundamental_df = self.fundamental_df
        fundamental_df["StockCode"] = fundamental_df["StockCode"].apply(lambda x: str(x).zfill(6))
        self.fundamental_df = fundamental_df
        return None

    def _setindex_StockCode(self):
        fundamental_df = self.fundamental_df
        fundamental_df.set_index("StockCode", inplace=True)
        self.fundamental_df = fundamental_df
        return None

    def get_NetProfit_dict(self):
        """
        당기순이익
        """
        df = self.fundamental_df
        NetProfit_df = df[df["AccountName"] == "당기순이익"].copy()
        NetProfit_dict = NetProfit_df["Amount"].to_dict()
        return NetProfit_dict

    def get_TotalAssets_dict(self):
        """
        자산총계
        """
        df = self.fundamental_df
        TotalAssets_df = df[df["AccountName"] == "자산총계"].copy()
        TotalAssets_dict = TotalAssets_df["Amount"].to_dict()
        return TotalAssets_dict

    def get_TotalEquity_dict(self):
        df = self.fundamental_df
        """
        자본총계
        """
        TotalEquity_df = df[df["AccountName"] == "자본총계"].copy()
        TotalEquity_dict = TotalEquity_df["Amount"].to_dict()
        return TotalEquity_dict

    def get_TotalLiabilities_dict(self):
        """
        부채총계
        """
        df = self.fundamental_df
        TotalLiabilities_df = df[df["AccountName"] == "부채총계"].copy()
        TotalLiabilities_dict = TotalLiabilities_df["Amount"].to_dict()
        return TotalLiabilities_dict

    def get_CurrentAssets_dict(self):
        """
        유동자산
        """
        df = self.fundamental_df
        CurrentAssets_df = df[df["AccountName"] == "유동자산"].copy()
        CurrentAssets_dict = CurrentAssets_df["Amount"].to_dict()
        return CurrentAssets_dict

    def get_CurrentLiabilites_dict(self):
        """
        유동부채
        """
        df = self.fundamental_df
        CurrentLiabilites_df = df[df["AccountName"] == "유동부채"].copy()
        CurrentLiabilites_dict = CurrentLiabilites_df["Amount"].to_dict()
        return CurrentLiabilites_dict

    def get_OperationProfit_dict(self):
        """
        영업이익
        """
        df = self.fundamental_df
        OperationProfit_df = df[df["AccountName"] == "영업이익"].copy()
        OperationProfit_dict = OperationProfit_df["Amount"].to_dict()
        return OperationProfit_dict


class SECTOR_PROCESSOR:
    def __init__(self, sector_df) -> None:
        self.sector_df = sector_df.copy()
        self._setindex_StockCode()

    def _setindex_StockCode(self):
        sector_df = self.sector_df
        sector_df.set_index("StockCode", inplace=True)
        self.sector_df = sector_df
        return None

    def get_Market_dict(self):
        Market_dict = self.sector_df["MarketName"].to_dict()
        return Market_dict

    def get_Sector_dict(self):
        Sector_dict = self.sector_df["SectorName"].to_dict()
        return Sector_dict


class NETPROFIT_PROCESSOR:
    def __init__(self, ohlcv_df) -> None:
        self.ohlcv_df = ohlcv_df

    def get_NetProfit_dict(self):
        VolumeCoef_dict = (
            self.ohlcv_df.sort_values("Date")
            .groupby("StockCode")["Close"]
            .apply(lambda x: self._get_netprofit(x))
            .to_dict()
        )
        return VolumeCoef_dict

    @staticmethod
    def _get_netprofit(values):
        try:
            values = np.array(values)
            buying_price = values[0]
            current_price = values[-1]
            netprofit = (current_price - buying_price) / buying_price
            return netprofit
        except:
            return 0


class COEFFICIENT_PROCESSOR:
    def __init__(self, ohlcv_df):
        self.ohlcv_df = ohlcv_df.copy()

    def get_CloseCoef_dict(self):
        CloseCoef_dict = (
            self.ohlcv_df.sort_values("Date")
            .groupby("StockCode")["Close"]
            .apply(lambda x: self._get_linear_coef(x))
            .to_dict()
        )
        return CloseCoef_dict

    def get_VolumeCoef_dict(self):
        VolumeCoef_dict = (
            self.ohlcv_df.sort_values("Date")
            .groupby("StockCode")["Volume"]
            .apply(lambda x: self._get_linear_coef(x))
            .to_dict()
        )
        return VolumeCoef_dict

    @staticmethod
    def _get_linear_coef(values):
        def _get_mms_values(values):
            if max(values) - min(values) != 0:
                mms_values = (np.array(values) - min(values)) / (max(values) - min(values))
            else:
                mms_values = np.array(values) / max(values)
            return mms_values

        def _get_linear_coef(values):
            try:
                x = np.arange(1, len(values) + 1).reshape(-1, 1)
                y = np.array(values).reshape(-1, 1)
                lr = LinearRegression()
                lr.fit(x, y)
                coef = lr.coef_[0][0]
            except:
                coef = 0
            return coef

        mms_values = _get_mms_values(values)
        linear_coef = _get_linear_coef(mms_values)
        return linear_coef

In [5]:
"""
Analyser
    - TECHNICAL_ANALYSER
    - FUNDAMENTAL_ANALYSER
    - SECTOR_ANALYSER
"""


class TECHNICAL_ANALYSER:
    def __init__(self, ohlcv_df) -> None:
        self.ohlcv_df = ohlcv_df

    def get_technical_analysis_df(self, start_date, end_date):
        filtered_ohlcv_df = self._get_filtered_ohlcv_df(self.ohlcv_df, start_date, end_date)
        analysis_df = self._get_raw_analysis_df(filtered_ohlcv_df)

        # coefficient_processor
        coefficient_processor = self._get_coefficient_processor(filtered_ohlcv_df)
        ## CloseCoef
        CloseCoef_dict = coefficient_processor.get_CloseCoef_dict()
        analysis_df["CloseCoef"] = analysis_df["StockCode"].map(CloseCoef_dict)
        ## VolumeCoef
        VolumeCoef_dict = coefficient_processor.get_VolumeCoef_dict()
        analysis_df["VolumeCoef"] = analysis_df["StockCode"].map(VolumeCoef_dict)

        # netprofit_processor
        netprofit_processor = self._get_netprofit_processor(filtered_ohlcv_df)
        ## NetProfit
        NetProfit_dict = netprofit_processor.get_NetProfit_dict()
        analysis_df["NetProfit"] = analysis_df["StockCode"].map(NetProfit_dict)

        return analysis_df

    @staticmethod
    def _get_filtered_ohlcv_df(ohlcv_df, start_date, end_date):
        ohlcv_preprocessor = OHLCV_PREPROCESSOR(ohlcv_df)
        filtered_ohlcv_df = ohlcv_preprocessor.get_date_filtered_ohlcv_df(
            start_date=start_date, end_date=end_date
        )
        return filtered_ohlcv_df

    @staticmethod
    def _get_raw_analysis_df(ohlcv_df):
        analysis_df = ohlcv_df.drop_duplicates("StockCode")[["StockCode"]].copy()
        return analysis_df

    @staticmethod
    def _get_coefficient_processor(ohlcv_df):
        coefficient_analyser = COEFFICIENT_PROCESSOR(ohlcv_df)
        return coefficient_analyser

    @staticmethod
    def _get_netprofit_processor(ohlcv_df):
        netprofit_processor = NETPROFIT_PROCESSOR(ohlcv_df)
        return netprofit_processor


class FUNDAMENTAL_ANALYSER:
    def __init__(self, ohlcv_df, fundamental_df, info_df) -> None:
        self.ohlcv_df = ohlcv_df
        self.fundamental_df = fundamental_df
        self.info_df = info_df

    def get_fundamental_analysis_df(self, start_date, end_date):
        filtered_ohlcv_df = self._get_filtered_ohlcv_df(self.ohlcv_df, start_date, end_date)

        filtered_ohlcv_df = self._map_fundamental_data(filtered_ohlcv_df, self.fundamental_df)
        filtered_ohlcv_df = self._map_info_data(filtered_ohlcv_df, self.info_df)

        filtered_ohlcv_df = self._append_PER(filtered_ohlcv_df)
        filtered_ohlcv_df = self._append_PBR(filtered_ohlcv_df)
        filtered_ohlcv_df = self._append_DebtPCT(filtered_ohlcv_df)

        fundamental_analysis_df = self._get_fundamental_analysis_df(filtered_ohlcv_df)
        return fundamental_analysis_df

    @staticmethod
    def _get_filtered_ohlcv_df(ohlcv_df, start_date, end_date):
        ohlcv_preprocessor = OHLCV_PREPROCESSOR(ohlcv_df)
        filtered_ohlcv_df = ohlcv_preprocessor.get_date_filtered_ohlcv_df(
            start_date=start_date, end_date=end_date
        )
        return filtered_ohlcv_df

    @staticmethod
    def _map_info_data(ohlcv_df, info_df):
        info_processor = INFO_PROCESSOR(info_df)
        TotalShares_dict = info_processor.get_TotalShares_dict()
        ohlcv_df["TotalShare"] = ohlcv_df["StockCode"].map(TotalShares_dict)
        return ohlcv_df

    @staticmethod
    def _map_fundamental_data(ohlcv_df, fundamental_df):
        fundmanental_processor = FUNDMANENTAL_PROCESSOR(fundamental_df)
        # 당기순이익
        NetProfit_dict = fundmanental_processor.get_NetProfit_dict()
        ohlcv_df["NetProfit"] = ohlcv_df["StockCode"].map(NetProfit_dict)
        # 총 자본
        TotalAssets_dict = fundmanental_processor.get_TotalAssets_dict()
        ohlcv_df["TotalAssets"] = ohlcv_df["StockCode"].map(TotalAssets_dict)
        # 총 부채
        TotalLiabilities_dict = fundmanental_processor.get_TotalLiabilities_dict()
        ohlcv_df["TotalLiabilities"] = ohlcv_df["StockCode"].map(TotalLiabilities_dict)
        return ohlcv_df

    @staticmethod
    def _append_PER(ohlcv_df):
        ohlcv_df["PER"] = (ohlcv_df["Close"] * ohlcv_df["TotalShare"]) / ohlcv_df["NetProfit"]
        return ohlcv_df

    @staticmethod
    def _append_PBR(ohlcv_df):
        ohlcv_df["PBR"] = (ohlcv_df["Close"] * ohlcv_df["TotalShare"]) / (
            ohlcv_df["TotalAssets"] - ohlcv_df["TotalLiabilities"]
        )
        return ohlcv_df

    @staticmethod
    def _append_DebtPCT(ohlcv_df):
        ohlcv_df["DebtPCT"] = ohlcv_df["TotalLiabilities"] / ohlcv_df["TotalAssets"]
        return ohlcv_df

    @staticmethod
    def _get_fundamental_analysis_df(ohlcv_df):
        ohlcv_df.dropna(inplace=True)
        fundamental_analysis_df = (
            ohlcv_df.groupby("StockCode")[["PER", "PBR", "DebtPCT"]].mean().reset_index()
        )
        return fundamental_analysis_df


class SECTOR_ANALYSER:
    def __init__(self, sector_df) -> None:
        self.sector_df = sector_df

    def get_sector_technical_analysis_df(self, technical_analysis_df):
        technical_analysis_df = self._map_sector_data(technical_analysis_df, self.sector_df)
        sector_technical_analysis_df = self._get_sector_analysis_df(technical_analysis_df)
        return sector_technical_analysis_df

    def get_sector_fundamental_analysis_df(self, fundamental_analysis_df):
        fundamental_analysis_df = self._map_sector_data(fundamental_analysis_df, self.sector_df)
        sector_fundamental_analysis_df = self._get_sector_analysis_df(fundamental_analysis_df)
        return sector_fundamental_analysis_df

    def get_sector_analysis_df(self, fundamental_analysis_df, technical_analysis_df):
        analysis_df = self._merge_analysis_df(fundamental_analysis_df, technical_analysis_df)
        analysis_df = self._map_sector_data(analysis_df, self.sector_df)
        sector_analysis_df = self._get_sector_analysis_df(analysis_df)
        return sector_analysis_df

    @staticmethod
    def _merge_analysis_df(a_df, b_df):
        analysis_df = pd.merge(a_df, b_df, how="inner", on="StockCode")
        return analysis_df

    @staticmethod
    def _map_sector_data(analysis_df, sector_df):
        sector_processor = SECTOR_PROCESSOR(sector_df)

        Market_dict = sector_processor.get_Market_dict()
        analysis_df["Market"] = analysis_df["StockCode"].map(Market_dict)

        Sector_dict = sector_processor.get_Sector_dict()
        analysis_df["Sector"] = analysis_df["StockCode"].map(Sector_dict)
        return analysis_df

    @staticmethod
    def _get_sector_analysis_df(analysis_df):
        sector_analysis_df = (
            analysis_df.drop(columns=["StockCode"]).groupby(["Market", "Sector"]).mean()
        )
        return sector_analysis_df

In [6]:
"""
Analysis Filter
    - SECTOR_FUNDAMENTAL_ANALYSIS_FILTER
    - SECTOR_TECHNICAL_ANALYSIS_FILTER
"""


class SECTOR_FUNDAMENTAL_ANALYSIS_FILTER:
    def __init__(self, sector_fundamental_analysis_df) -> None:
        self.sector_fundamental_analysis_df = sector_fundamental_analysis_df

    def __call__(self):
        df = self.sector_fundamental_analysis_df

        # PER
        lower_limit = 0
        upper_limit = df["PER"].max()
        per_filtered_row = self.filter_by_limit(df, "PER", lower_limit, upper_limit)

        # PBR
        lower_limit = max(df["PBR"].quantile(0.05), 0.6)
        upper_limit = min(df["PBR"].quantile(0.5), 2)
        pbr_filtered_row = self.filter_by_limit(df, "PBR", lower_limit, upper_limit)

        # Debt
        lower_limit = max(df["DebtPCT"].quantile(0.1), 0.15)
        upper_limit = min(df["DebtPCT"].quantile(0.7), 0.4)
        debt_filtered_row = self.filter_by_limit(df, "DebtPCT", lower_limit, upper_limit)

        filtered_sectors = df.loc[per_filtered_row & pbr_filtered_row & debt_filtered_row, :].index
        return filtered_sectors

    @staticmethod
    def filter_by_limit(df, column, lower_limit, upper_limit):
        filtered_row = (df[column] > lower_limit) & (df[column] < upper_limit)
        return filtered_row


class SECTOR_TECHNICAL_ANALYSIS_FILTER:
    def __init__(self, sector_technical_analysis_df) -> None:
        self.sector_technical_analysis_df = sector_technical_analysis_df

    def __call__(self, fundamental_filtered_sectors):
        df = self.sector_technical_analysis_df

        # Fundamental Filter
        filtered_df = df.loc[df.index.isin(fundamental_filtered_sectors), :]

        filtered_df = self.append_score(filtered_df)
        filtered_sectors = self.get_filtered_sectors(filtered_df)
        return filtered_sectors

    @staticmethod
    def append_score(df):
        df.loc[:, ["Score"]] = df["CloseCoef"].apply(lambda x: x * -1) + df["VolumeCoef"].apply(
            lambda x: x * -1
        )
        return df

    @staticmethod
    def get_filtered_sectors(df):
        n = max(len(df) // 3, 1)
        filtered_sectors = df.nlargest(n, "Score").index
        return filtered_sectors

## Codes

In [7]:
print(f"Current Latest Data : {ohlcv_df['Date'].max()}")

Current Latest Data : 2023-11-02


In [8]:
CFG = {
    "start_date": (dt.datetime.now() - dt.timedelta(weeks=2)).strftime("%Y-%m-%d"),
    "end_date": (dt.datetime.now() - dt.timedelta(weeks=0)).strftime("%Y-%m-%d"),
}


start_date = CFG["start_date"]
end_date = CFG["end_date"]

In [9]:
technical_analyser = TECHNICAL_ANALYSER(ohlcv_df)
technical_analysis_df = technical_analyser.get_technical_analysis_df(start_date, end_date)

In [10]:
fundamental_analyser = FUNDAMENTAL_ANALYSER(ohlcv_df, fundamental_df, info_df)
fundamental_analysis_df = fundamental_analyser.get_fundamental_analysis_df(start_date, end_date)

In [11]:
sector_analyser = SECTOR_ANALYSER(sector_df)

sector_technical_analysis_df = sector_analyser.get_sector_technical_analysis_df(
    technical_analysis_df
)
sector_fundamental_analysis_df = sector_analyser.get_sector_fundamental_analysis_df(
    fundamental_analysis_df
)

In [12]:
sector_fundamental_analysis_filter = SECTOR_FUNDAMENTAL_ANALYSIS_FILTER(
    sector_fundamental_analysis_df
)
fundamental_filtered_sectors = sector_fundamental_analysis_filter()

sector_technical_analysis_filter = SECTOR_TECHNICAL_ANALYSIS_FILTER(sector_technical_analysis_df)
filtered_sectors = sector_technical_analysis_filter(fundamental_filtered_sectors)

In [13]:
filtered_sectors

MultiIndex([( 'KOSPI', '비금속광물'),
            ('KOSDAQ',   '비금속')],
           names=['Market', 'Sector'])

In [14]:
main_stocks_df = sector_df[
    (sector_df["MarketName"].isin([ms[0] for ms in filtered_sectors]))
    & (sector_df["SectorName"].isin([ms[1] for ms in filtered_sectors]))
]

In [15]:
main_stocks = sorted(set(main_stocks_df["StockCode"]))

In [17]:
main_fundamental_analysis_df = fundamental_analysis_df[
    fundamental_analysis_df["StockCode"].isin(main_stocks)
    & (fundamental_analysis_df["PER"] > 0)
    & (fundamental_analysis_df["DebtPCT"] < 0.38)
    & (fundamental_analysis_df["DebtPCT"] > 0.1)
]

In [18]:
fundamental_filtered_main_stocks = sorted(set(main_fundamental_analysis_df["StockCode"]))

In [19]:
main_technical_analysis_df = technical_analysis_df[
    technical_analysis_df["StockCode"].isin(fundamental_filtered_main_stocks)
].copy()

In [20]:
main_technical_analysis_df["Score"] = (
    main_technical_analysis_df["CloseCoef"].apply(lambda x: x * -1)
    + main_technical_analysis_df["VolumeCoef"]
)

In [21]:
main_technical_analysis_df.sort_values("Score", ascending=False, inplace=True)

In [22]:
main_technical_analysis_df.groupby(['Market','Sector']).head(2)

Unnamed: 0,StockCode,CloseCoef,VolumeCoef,NetProfit,Market,Sector,Score
1,10040,-0.10947,-0.071934,-0.070447,KOSPI,비금속광물,0.037535
1,74600,-0.066667,-0.050211,-0.014286,KOSDAQ,비금속,0.016455
1,3300,0.052778,0.067958,0.011916,KOSPI,비금속광물,0.01518
1,75970,-0.095425,-0.093445,-0.052713,KOSDAQ,비금속,0.001979
