In [59]:
CFG = {
    "dataset_window": 200,
    "input_window": 15,
    "output_window": 5,
    "sampling_cnt": 150,
}

In [60]:
"""
General

DataLoader
    - STATUS_LOADER
    - SYMBOL_LOADER

"""
# LOADER
import datetime as dt
import pandas as pd
import kquant as kq


class STATUS_LOADER:
    """
    STATUS_LOADER : 상태 정보 추출 클래스

    Methods:
        - __init__
        - get_current_cash
        - get_status_df
    """

    def __init__(self, dict_df_result, dict_df_position) -> None:
        """
        STATUS_LOADER의 생성자

        Args:
            dict_df_result (dict(str,pd.DataFrame))
            dict_df_position (dict(str,pd.DataFrame))

        """
        self.dict_df_result = dict_df_result
        self.dict_df_position = dict_df_position

    def get_current_cash(self) -> float:
        """
        현재 보유 현금을 반환하는 메서드
            - 만약 dict_df_result에서 CASH column을 찾을 수 없다면 초기투자금인 10억을 반환합니다.

        Returns:
            float: 현재 보유 현금
        """
        _dict_df_result = self.dict_df_result
        try:
            _df_result_total = _dict_df_result["TOTAL"]
            _current_cash = (
                _df_result_total.sort_values("DATE").tail(1)["CASH"].values[0]
            )
            return _current_cash
        except:
            return 1_000_000_000.0

    def get_status_df(self) -> pd.DataFrame:
        """
        현재 보유 position 관련 정보를 반환하는 메서드

        Returns:
            pd.DataFrame: columns = [SYMBOL, CURRENT_QTY, CURRENT_PRICE, TRADE_PRICE]
        """
        current_symbol_list = list()
        _dict_df_result = self.dict_df_result
        _dict_df_position = self.dict_df_position

        _total_symbols = sorted(_dict_df_position.keys())

        for _symbol in _total_symbols:
            try:
                _symbol_result_df = _dict_df_result[_symbol]
                _symbol_position_df = _dict_df_position[_symbol]

                _current_price = (
                    _symbol_result_df.sort_values("DATE").tail(1)["PRICE"].values[0]
                )
                _trade_price = _symbol_position_df["TRADE_PRICE"].values[0]
                _current_qty = _symbol_position_df["QTY"].values[0]

                current_symbol_list.append(
                    {
                        "SYMBOL": _symbol,
                        "CURRENT_QTY": _current_qty,
                        "CURRENT_PRICE": _current_price,
                        "TRADE_PRICE": _trade_price,
                    }
                )
            except:
                pass
        return pd.DataFrame(
            current_symbol_list,
            columns=["SYMBOL", "CURRENT_QTY", "CURRENT_PRICE", "TRADE_PRICE"],
        )


class SYMBOL_LOADER:
    """
    SYMBOL_LOADER : 주식 symbol 정보 추출 클래스

    Inner_Classes :
        SYMBOl_FILTER

    Methods :
        - filter_symbols_df
        - get_symbols
        - __call__
    """

    @staticmethod
    def load_symbols_df() -> pd.DataFrame:
        """
        symbols_df를 호출하는 메서드

        Returns:
            pd.DataFrame :
        """
        symbols_df = kq.symbol_stock()
        return symbols_df

    class SYMBOL_FILTER:
        """
        SYMBOl_FILTER : 주식 symbol을 filtering 하는 클래스

        Methods :
                - filter__market
                - filter__admin_issue
                - filter__sec_type
        """

        @staticmethod
        def filter__market(symbols_df: pd.DataFrame) -> pd.DataFrame:
            """
            market에 대한 필터링을 진행하는 메서드

            Returns:
                pd.DataFrame : MARKET이 [코스닥, 유가증권]에 속하는 row만 유지
            """
            filtered_symbols_df = symbols_df[
                (symbols_df["MARKET"].isin(["코스닥", "유가증권"]))
            ].copy()
            return filtered_symbols_df

        @staticmethod
        def filter__admin_issue(symbols_df: pd.DataFrame) -> pd.DataFrame:
            """
            ADMIN_ISSUE에 대한 필터링을 진행하는 메서드

            Returns:
                pd.DataFrame : ADMIN_ISSUE가 0 인 row만 유지
            """
            filtered_symbols_df = symbols_df[(symbols_df["ADMIN_ISSUE"] == 0)].copy()
            return filtered_symbols_df

        @staticmethod
        def filter_sec_type(symbols_df: pd.DataFrame) -> pd.DataFrame:
            """
            SEC_TYPE에 대한 필터링을 진행하는 메서드

            Returns:
                pd.DataFrame : SEC_TYPE이 [ST, EF, EN]에 속하는 row만 유지
            """
            filtered_symbols_df = symbols_df[
                symbols_df["SEC_TYPE"].isin(["ST", "EF", "EN"])
            ].copy()
            return filtered_symbols_df

    def filter_symbols_df(self, symbols_df: pd.DataFrame) -> pd.DataFrame:
        """
        symbol_df 에 대한 필터링을 진행하는 메서드

        Returns:
                pd.DataFrame : SYMBOl_FILTER의 필터 메서드를 거친 row만 유지
        """
        symbol_filter = self.SYMBOL_FILTER()
        filtered_symbols_df = symbol_filter.filter__market(symbols_df)
        filtered_symbols_df = symbol_filter.filter__admin_issue(filtered_symbols_df)
        filtered_symbols_df = symbol_filter.filter_sec_type(filtered_symbols_df)
        return filtered_symbols_df

    @staticmethod
    def get_symbols(symbols_df: pd.DataFrame) -> list:
        """
        symbols_df의 symbol을 중복을 제거하여 추출하는 메서드

        Returns:
            list : symbols
        """
        symbols = sorted(set(symbols_df["SYMBOL"]))
        return symbols

    # SYMBOL_LOADER PIPELINE
    def __call__(self) -> list:
        """
        SYMBOL_LOADER의 파이프라인을 제공하는 메서드

        Returns:
            list : symbols
        """
        symbols_df = self.load_symbols_df()
        filtered_symbols_df = self.filter_symbols_df(symbols_df)
        symbols = self.get_symbols(filtered_symbols_df)
        return symbols
    

In [61]:
"""
Specific

DataLoader
    - TECHNICAL_LOADER
    - SIMILARITY_PROCESSOR
"""

class TECHNICAL_LOADER:
    def __init__(self, symbols) -> None:
        self.symbols = symbols

    @staticmethod
    def load_stock_data_df(symbols):
        stock_data_df_list = list()
        for symbol in tqdm(symbols):
            _stock_data_df = kq.daily_stock(symbol)
            stock_data_df_list.append(_stock_data_df)
        stock_data_df = pd.concat(stock_data_df_list, axis=0)
        return stock_data_df

    class STOCK_FILTER:
        @staticmethod
        def filter__zero_row(stock_data_df):
            columns = ["VOLUME", "OPEN", "HIGH", "LOW", "CLOSE"]
            filtered_stock_data_df = stock_data_df[
                (stock_data_df.loc[:, columns].all(axis=1))
            ].copy()
            return filtered_stock_data_df

        @staticmethod
        def filter__na_row(stock_data_df):
            columns = ["VOLUME", "OPEN", "HIGH", "LOW", "CLOSE"]
            filtered_stock_data_df = stock_data_df[
                ~(stock_data_df[columns].isna().any(axis=1))
            ].copy()
            return filtered_stock_data_df

        @staticmethod
        def filter__cnt(stock_data_df):
            stock_cnt_series = stock_data_df.groupby("SYMBOL").count()["CLOSE"]
            stock_mode_cnt = stock_cnt_series.mode()[0]

            cnt_filtered_series = stock_cnt_series[stock_cnt_series == stock_mode_cnt]
            cnt_filtered_stocks = cnt_filtered_series.index

            filtered_stock_data_df = stock_data_df[
                stock_data_df["SYMBOL"].isin(cnt_filtered_stocks)
            ].copy()
            return filtered_stock_data_df

    def filter_stock_data_df(self, stock_data_df):
        stock_filter = self.STOCK_FILTER()
        filtered_stock_data_df = stock_filter.filter__zero_row(stock_data_df)
        filtered_stock_data_df = stock_filter.filter__na_row(filtered_stock_data_df)
        filtered_stock_data_df = stock_filter.filter__cnt(filtered_stock_data_df)
        return filtered_stock_data_df

    def __call__(self):
        stock_data_df = self.load_stock_data_df(self.symbols)
        filtered_stock_data_df = self.filter_stock_data_df(stock_data_df)
        return filtered_stock_data_df

class SIMILARITY_PROCESSOR:
    """
    def __init__(self, stock_df, CFG, n) -> None:
        self.stock_df = stock_df
        self.CFG = CFG
        self.n = n

        return None
    """

    def __init__(self, stock_df, CFG, n) -> None:
        self.stock_df = stock_df
        self.CFG = CFG
        self.n = n

        return None

    def get_x_y_dataset(self, array_ist):
        CFG = self.CFG
        i_window = CFG["input_window"]
        o_window = CFG["output_window"]

        x_dataset = list()
        y_dataset = list()

        for idx in range(len(array_ist) - i_window - o_window + 1):
            _x = array_ist[idx : idx + i_window]
            _y = array_ist[idx + i_window : idx + i_window + o_window]
            x_dataset.append(_x)
            y_dataset.append(_y)

        x_dataset = np.array(x_dataset)
        y_dataset = np.array(y_dataset).sum(axis=1)
        final_x = array_ist[-i_window:]

        return (final_x, x_dataset, y_dataset)

    class SIMILARITY_MODEL_MAIN:
        def get_similarity_main_df(self, x_dataset, y_dataset, final_x, n):
            def get_cosine_similarity(array_1, array_2):
                cosine_similarity = np.dot(array_1, array_2) / (
                    np.linalg.norm(array_1) * np.linalg.norm(array_2)
                )
                return cosine_similarity

            similarity_results = list()
            for x_data, y_data in zip(x_dataset, y_dataset):
                _similarity_score = get_cosine_similarity(x_data, final_x)
                similarity_results.append(
                    {
                        "similarity_score": _similarity_score,
                        "actual_y": y_data,
                    }
                )
            similarity_df = pd.DataFrame(similarity_results)
            similarity_main_df = similarity_df.nlargest(n, "similarity_score")
            return similarity_main_df

        @staticmethod
        def get_pred_y(similarity_df):
            pred_y = (
                similarity_df["similarity_score"] * similarity_df["actual_y"]
            ).mean()
            return pred_y

        def __call__(self, final_x, x_dataset, y_dataset, n):
            similarity_main_df = self.get_similarity_main_df(
                x_dataset, y_dataset, final_x, n
            )
            pred_y = self.get_pred_y(similarity_main_df)
            return pred_y

    def __call__(self):
        stock_df = self.stock_df
        array_list = stock_df["CHG_PCT"].values
        final_x, x_dataset, y_dataset = self.get_x_y_dataset(array_list)

        similarity_model_main = self.SIMILARITY_MODEL_MAIN()
        pred_y = similarity_model_main(final_x, x_dataset, y_dataset, self.n)
        return pred_y

In [62]:
class ORDER_PROCESSOR:
    class GET_BUYING_ORDERS:
        def __init__(self, result_df, daily_invest_money, position_symbols) -> None:
            result_df = result_df[result_df["SCORE"] > 0]
            self.result_df = result_df
            self.daily_invest_money = daily_invest_money
            self.position_symbols = position_symbols

        @staticmethod
        def filter_position_symbols(result_df: pd.DataFrame, position_symbols: list):
            filtered_result_df = result_df[
                ~(result_df["SYMBOL"].isin(position_symbols))
            ]
            return filtered_result_df

        @staticmethod
        def get_high_score_df(result_df: pd.DataFrame) -> pd.DataFrame:
            high_score_df = result_df.nsmallest(5, "SCORE")
            return high_score_df

        @staticmethod
        def append_score_weight(high_score_df: pd.DataFrame) -> pd.DataFrame:
            high_score_df["SCORE_WEIGHT"] = (
                high_score_df["SCORE"] / high_score_df["SCORE"].sum()
            )
            return high_score_df

        @staticmethod
        def append_price_invest(
            high_score_df: pd.DataFrame, daily_invest_money: float
        ) -> pd.DataFrame:
            high_score_df["PRICE_INVEST"] = (
                high_score_df["SCORE_WEIGHT"] / high_score_df["SCORE_WEIGHT"].sum()
            ) * daily_invest_money
            return high_score_df

        @staticmethod
        def append_cnt_invest(high_score_df: pd.DataFrame) -> pd.DataFrame:
            high_score_df["CNT_INVEST"] = (
                high_score_df["PRICE_INVEST"] // high_score_df["CLOSE"]
            )
            return high_score_df

        def __call__(self):
            result_df = self.result_df
            result_df = self.filter_position_symbols(result_df, self.position_symbols)
            high_score_df = self.get_high_score_df(result_df)
            high_score_df = self.append_score_weight(high_score_df)
            high_score_df = self.append_price_invest(
                high_score_df, self.daily_invest_money
            )
            high_score_df = self.append_cnt_invest(high_score_df)
            buying_orders = list(
                high_score_df.set_index("SYMBOL")["CNT_INVEST"]
                .astype(int)
                .to_dict()
                .items()
            )
            return buying_orders

In [63]:
import random
import logging
import datetime as dt
import pandas as pd


def trade_func(
    date: dt.date,
    dict_df_result: dict[str, pd.DataFrame],
    dict_df_position: dict[str, pd.DataFrame],
    logger: logging.Logger,
) -> list[tuple[str, int]]:
    """
    STATUS_LOADER
        : get_current_cash()
            -> 현재 가용 가능한 현금을 가져옵니다.
        : get_status_df()
            -> 현재 포지션이 있는 주식들에 대한 정보를 가져옵니다.
    """
    status_loader = STATUS_LOADER(dict_df_result, dict_df_position)

    current_cash = status_loader.get_current_cash()
    daily_invest_money = current_cash / 2
    status_df = status_loader.get_status_df()
    position_symbols = sorted(set(status_df["SYMBOL"]))

    """
    SYMBOL_LOADER
        : __call__()
            -> 현재 시장에서 거래 가능한 symbol을 모두 가져옵니다.
    """
    symbol_loader = SYMBOL_LOADER()
    total_symbols = symbol_loader()

    sampled_symbols = random.sample(
        total_symbols,
        CFG["sampling_cnt"],
    )
    using_symbols = sorted(set(sampled_symbols + position_symbols))
    """
    TECHNICAL_LOADER
        : __call__()
            -> symbols에 대하여, technical_analysis를 위해 필요한 데이터를 추출합니다.
    """
    technical_loader = TECHNICAL_LOADER(using_symbols)
    stocks_df = technical_loader()
    """
    SIMILARITY_MODEL
        : __call__()
            -> technical analysis 기반 결과 출력
    """
    symbols = sorted(set(stocks_df["SYMBOL"]))
    results = list()
    for symbol in symbols:
        stock_df = stocks_df[stocks_df["SYMBOL"] == symbol]
        close = stock_df.sort_values("DATE").head(1)["CLOSE"].values[0]

        similarity_model = SIMILARITY_PROCESSOR(stock_df, CFG, 5)
        pred_y = similarity_model()

        results.append({"SYMBOL": symbol, "SCORE": pred_y, "CLOSE": close})
    results_df = pd.DataFrame(results)

    order_processor = ORDER_PROCESSOR.GET_BUYING_ORDERS(
        results_df, daily_invest_money, position_symbols
    )
    symbols_and_orders = order_processor()
    return symbols_and_orders

In [64]:
# loop
dict_df_result = kq.backtest_stock_port_daily(
    trade_func,
    "2023-08-11",  # 실제 심사에서는 투자기간 시작일
    "2023-09-11",  # 실제 심사에서는 투자기간 종료일
    init_cash=1_000_000_000,  # 10억원
)


100%|██████████| 150/150 [01:08<00:00,  2.19it/s]
[2023-08-11] 종목: 328380, 주문전 보유수량:      0 주문수량: 10,669, 매매수량: 10,669, 주문후 보유수량: 10,669
[2023-08-11] 종목: 052900, 주문전 보유수량:      0 주문수량: 34,037, 매매수량: 34,037, 주문후 보유수량: 34,037
[2023-08-11] 종목: 305540, 주문전 보유수량:      0 주문수량:  4,200, 매매수량:  4,200, 주문후 보유수량:  4,200
[2023-08-11] 종목: 002995, 주문전 보유수량:      0 주문수량:  5,997, 매매수량:  5,997, 주문후 보유수량:  5,997
[2023-08-11] 종목: 084680, 주문전 보유수량:      0 주문수량: 114,347, 매매수량: 114,347, 주문후 보유수량: 114,347
100%|██████████| 155/155 [01:11<00:00,  2.16it/s]
[2023-08-14] 종목: 406760, 주문전 보유수량:      0 주문수량:  2,148, 매매수량:  2,148, 주문후 보유수량:  2,148
[2023-08-14] 종목: 078350, 주문전 보유수량:      0 주문수량:  1,788, 매매수량:  1,788, 주문후 보유수량:  1,788
[2023-08-14] 종목: 382840, 주문전 보유수량:      0 주문수량:  2,541, 매매수량:  2,541, 주문후 보유수량:  2,541
[2023-08-14] 종목: 014440, 주문전 보유수량:      0 주문수량: 18,319, 매매수량: 18,319, 주문후 보유수량: 18,319
[2023-08-14] 종목: 700016, 주문전 보유수량:      0 주문수량:  4,771, 매매수량:  4,771, 주문후 보유수량:  4,771
100%|██████████| 158/158 [0

In [65]:
dict_df_result['TOTAL']['TOTAL_VALUE'].tail(1)

20    973,995,117
Name: TOTAL_VALUE, dtype: int64