# Requirements

In [None]:
import os
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
from IPython.display import clear_output

plt.rc('font', family='Malgun Gothic')  

pd.set_option('mode.chained_assignment', None)

## Timeline strategy

### load database, aggregate fluctuation s by s

In [53]:
database_dir_path = "D:\Projects\Richer\QuantAnalyzer\Stock\database\KOSDAQ"

df_list_ = []
df_name_list = []
df_fluc_list = []

for p_i, pkl in enumerate(os.listdir(database_dir_path)):
    
    print(p_i)
    df = pd.read_excel(os.path.join(database_dir_path, pkl), index_col=0)
    # display(df.head())
    # break
    
    try:
        old_cols = ['시가', '고가', '저가', '현재가', '거래량']
        df = df[old_cols]
        df.columns = ['open', 'high', 'low', 'close', 'volume']
    except:
        pass

    # print(len(df))
    
    price_fluctuation = df.close / df.close.shift(1)  
    price_fluctuation.name = p_i
    
    df_list_.append(df)
    df_name_list.append(pkl)
    df_fluc_list.append(price_fluctuation)
    
    # if p_i > 20:
    #     break
    
                  
    clear_output(wait=True)

1704


### get top rank, alignment by fluctuation ranking.

In [None]:
df_flucs_rank = pd.concat(df_fluc_list, axis=1).rank(ascending=False, method='first', axis=1)

top_rank = 10
df_buy = df_flucs_rank <= top_rank
df_buy_shift = df_buy.shift(1)

### strategy config.

In [None]:
take_profit = 1.1  # 10%
stop_loss = 0.95   # -5%

fees = 0.00015 + 0.00015  # 0.015% 수수료
fee_trade = 0.0018

save_excel = 1
save_path = database_dir_path.replace('database', 'result')
os.makedirs(save_path, exist_ok=True)

show_plot = 0

#### buy in close_price

In [58]:
df_list_[1]

Unnamed: 0,open,high,low,close,volume,signal,buy,hold,sell,profit,매수신호,매수여부,보유여부,매도여부,매수가,수익률
2019-01-04 15:30:00,6580,6790,6310,6310,6425,False,,,,1,False,,,,,1
2019-01-07 15:30:00,6320,6850,6320,6750,6814,False,,,,1,False,,,,,1
2019-01-08 15:30:00,6760,6850,6610,6750,1921,False,,,,1,False,,,,,1
2019-01-09 15:30:00,6750,6840,6650,6760,5374,False,,,,1,False,,,,,1
2019-01-10 15:30:00,6660,6920,6610,6840,6861,False,,,,1,False,,,,,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2024-01-22 15:30:00,19000,19970,18520,19450,319785,False,,,,1,False,,,,,1
2024-01-23 15:30:00,19640,20150,18800,18830,180412,False,,,,1,False,,,,,1
2024-01-24 15:30:00,18900,19700,18190,18370,109235,False,,,,1,False,,,,,1
2024-01-25 15:30:00,18260,18260,17800,17850,68447,False,,,,1,False,,,,,1


In [57]:
symbol_names = []
buys  = []
holds = []
sells = []
profits = []
profit_cums = []

df_list = df_list_.copy()

col_i = 0  # symbol_index, for test.
for col_i, (_, df_name) in enumerate(zip(df_fluc_list, df_name_list)):

    print(col_i, end='\r')
    
    en_ex_total = []
    en_ex = []
    
    # 1. get symbol name from file name.
    # print(df_name)  # 2024-01-28 A000250_삼천당제약_1d.xlsx
    symbol_name = df_name.split(' ')[1].split('_')[1]
    # print(symbol_name)
    # break
    
    # 2. we will use symbol name as index.
    symbol_names.append(symbol_name)
    
    # executed = 0 # init.
    
    for row, signal in zip(df_list[col_i].iterrows(), df_buy.iloc[:, col_i]):

        en_ex_len = len(en_ex)

        # a. check buy signal.
            # i. buying symbol multiple times invalid.
        if signal and en_ex_len == 0:
            en_ex.append(row[-1].name)
            en_ex.append(row[-1]['close'])
            hold_ts = []

            
        if en_ex_len > 0:
            
            # # b. check buy execution.
            # if not executed:
            #     if row[-1]['low'] < en_ex[1]:
            #         executed = 1
                    
            # c. check sell signal.
            # else:                
                exit_done = 0
    
                #     i. take_profit
                if row[-1]['high'] / en_ex[1] >= take_profit:
                    profit = take_profit * (1 - fees - fee_trade) # 증권거래세 등(0.18%)
                    exit_done = 1
                    
                #     ii. loss_cut
                elif row[-1]['low'] / en_ex[1] <= stop_loss:
                    profit = stop_loss * (1 - fees)
                    exit_done = -1
    
                #     iii. result.
                #         1. if exit_done, trade end.
                if exit_done:
                    en_ex.append(row[-1].name)
                    en_ex.append(exit_done)
                    en_ex.append(profit)
                    en_ex.append(hold_ts)
                    
                    en_ex_total.append(en_ex)
                    en_ex = []
                    
                    # executed = 0
                    
                else:
                    hold_ts.append(row[-1].name)
                    # en_ex.append(row[-1].name)
                    # en_ex.append(exit_done)
                    # en_ex.append(np.nan)
                


    en_ex_total_arr = np.array(en_ex_total, dtype=object)

    # 3. initialization
    df_list[col_i]['매수신호'] = df_buy.iloc[:, col_i]
    df_list[col_i]['매수여부'] = np.nan
    df_list[col_i]['보유여부'] = np.nan
    df_list[col_i]['매도여부'] = np.nan
    df_list[col_i]['매수가'] = np.nan
    df_list[col_i]['수익률'] = 1
    # df_list[col_i]['profit_cum'] = 1

    # 4. enlist trade info.
    #    a. profit_cum is invalid, cause accumulated profit could involve out ranked profit.
    if len(en_ex_total_arr) > 0:
        df_list[col_i]['매수여부'].loc[en_ex_total_arr[:, 0]] = 1
        df_list[col_i]['보유여부'].loc[np.hstack(en_ex_total_arr[:, -1])] = 1
        df_list[col_i]['매도여부'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 3]
        df_list[col_i]['매수가'].loc[en_ex_total_arr[:, 0]] = en_ex_total_arr[:, 1]
        df_list[col_i]['수익률'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 4]
        # df_list[col_i]['profit_cum'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 4]
        # df_list[col_i]['profit_cum'] = np.cumprod(df_list[col_i]['profit'])
        # df_list[col_i]['profit_cum'] = np.cumprod(df_list[col_i]['profit_cum'])
        
    buys.append(df_list[col_i]['매수여부']) # .to_numpy()
    holds.append(df_list[col_i]['보유여부']) # .to_numpy()
    sells.append(df_list[col_i]['매도여부']) # .to_numpy()
    profits.append(df_list[col_i]['수익률']) # .to_numpy()
    # profit_cums.append(df_list[col_i]['profit_cum']) # .to_numpy()

    # display(df_list[col_i])

    if show_plot:
        plt.figure(figsize=(5,5))
        plt.plot(np.nancumprod(df_list[col_i]['profit']))
        plt.title(df_name_list[col_i])
        plt.show()
    
    if save_excel:
        df_list[col_i].to_excel(os.path.join(save_path, '{}'.format(df_name_list[col_i])), index=True)
    
    
    # print(len(df_list[col_i]['buy'].to_numpy()))
    # print(len(df_list[col_i]))
    # break

10

KeyboardInterrupt: 

#### buy in close_price at the next day.

In [None]:
symbol_names = []
buys  = []
holds = []
sells = []
profits = []
profit_cums = []

df_list = df_list_.copy()


col_i = 0  # symbol_index, for test.
for col_i, (_, df_name) in enumerate(zip(df_fluc_list, df_name_list)):

    print(col_i, end='\r')
    
    en_ex_total = []
    en_ex = []
    
    # 1. get symbol name from file name.
    # print(df_name)  # 2024-01-28 A000250_삼천당제약_1d.xlsx
    symbol_name = df_name.split(' ')[1].split('_')[1]
    # print(symbol_name)
    # break
    
    # 2. we will use symbol name as index.
    symbol_names.append(symbol_name)
    
    executed = 0 # init.
    
    for row, signal in zip(df_list[col_i].iterrows(), df_buy.iloc[:, col_i]):

        en_ex_len = len(en_ex)

        # a. check buy signal.
            # i. buying symbol multiple times invalid.
        if signal and en_ex_len == 0:
            en_ex.append(row[-1].name)
            en_ex.append(row[-1]['close'])
            hold_ts = []

            
        if en_ex_len > 0:
            
            # b. check buy execution.
            if not executed:
                if row[-1]['low'] < en_ex[1]:
                    executed = 1
                    
            # c. check sell signal.
            else:                
                exit_done = 0
    
                #     i. take_profit
                if row[-1]['high'] / en_ex[1] >= take_profit:
                    profit = take_profit * (1 - fees - fee_trade) # 증권거래세 등(0.18%)
                    exit_done = 1
                    
                #     ii. loss_cut
                elif row[-1]['low'] / en_ex[1] <= stop_loss:
                    profit = stop_loss * (1 - fees)
                    exit_done = -1
    
                #     iii. result.
                #         1. if exit_done, trade end.
                if exit_done:
                    en_ex.append(row[-1].name)
                    en_ex.append(exit_done)
                    en_ex.append(profit)
                    en_ex.append(hold_ts)
                    
                    en_ex_total.append(en_ex)
                    en_ex = []
                    
                    executed = 0
                    
                else:
                    hold_ts.append(row[-1].name)
                    # en_ex.append(row[-1].name)
                    # en_ex.append(exit_done)
                    # en_ex.append(np.nan)
                


    en_ex_total_arr = np.array(en_ex_total, dtype=object)

    # 3. initialization
    df_list[col_i]['buy'] = np.nan
    df_list[col_i]['hold'] = np.nan
    df_list[col_i]['sell'] = np.nan
    df_list[col_i]['profit'] = 1
    # df_list[col_i]['profit_cum'] = 1

    # 4. enlist trade info.
    #    a. profit_cum is invalid, cause accumulated profit could involve out ranked profit.
    if len(en_ex_total_arr) > 0:
        df_list[col_i]['buy'].loc[en_ex_total_arr[:, 0]] = 1
        df_list[col_i]['hold'].loc[np.hstack(en_ex_total_arr[:, -1])] = 1
        df_list[col_i]['sell'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 3]
        df_list[col_i]['profit'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 4]
        # df_list[col_i]['profit_cum'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 4]
        # df_list[col_i]['profit_cum'] = np.cumprod(df_list[col_i]['profit'])
        # df_list[col_i]['profit_cum'] = np.cumprod(df_list[col_i]['profit_cum'])
        
    buys.append(df_list[col_i]['buy']) # .to_numpy()
    holds.append(df_list[col_i]['hold']) # .to_numpy()
    sells.append(df_list[col_i]['sell']) # .to_numpy()
    profits.append(df_list[col_i]['profit']) # .to_numpy()
    # profit_cums.append(df_list[col_i]['profit_cum']) # .to_numpy()

    # display(df_list[col_i])

    if show_plot:
        plt.figure(figsize=(5,5))
        plt.plot(np.nancumprod(df_list[col_i]['profit']))
        plt.title(df_name_list[col_i])
        plt.show()
    
    if save_excel:
        df_list[col_i].to_excel(os.path.join(save_path, '{}'.format(df_name_list[col_i])), index=True)
    
    
    # print(len(df_list[col_i]['buy'].to_numpy()))
    # print(len(df_list[col_i]))
    # break

#### buy in open_price

In [None]:
symbol_names = []
buys  = []
holds = []
sells = []
profits = []
profit_cums = []

df_list = df_list_.copy()

col_i = 0  # symbol_index, for test.
for col_i, (_, df_name) in enumerate(zip(df_fluc_list, df_name_list)):

    print(col_i, end='\r')
    
    en_ex_total = []
    en_ex = []
    
    # 1. get symbol name from file name.
    # print(df_name)  # 2024-01-28 A000250_삼천당제약_1d.xlsx
    symbol_name = df_name.split(' ')[1].split('_')[1]
    # print(symbol_name)
    # break
    
    # 2. we will use symbol name as index.
    symbol_names.append(symbol_name)
    

    for row, signal in zip(df_list[col_i].iterrows(), df_buy_shift.iloc[:, col_i]):

        en_ex_len = len(en_ex)

        # a. check buy signal.
            # i. buying symbol multiple times invalid.
        if not pd.isnull(signal):
            if signal and en_ex_len == 0:
                en_ex.append(row[-1].name)
                en_ex.append(row[-1]['open'])
                hold_ts = []
            
        en_ex_len = len(en_ex)
        
        # b. check sell signal.
        if en_ex_len > 0:

            exit_done = 0

            # i. take_profit
            if row[-1]['high'] / en_ex[1] >= take_profit:
                profit = take_profit * (1 - fees - fee_trade) # 증권거래세 등(0.18%)
                exit_done = 1
            elif row[-1]['low'] / en_ex[1] <= stop_loss:
                profit = stop_loss * (1 - fees)
                exit_done = -1

            if exit_done:
                en_ex.append(row[-1].name)
                en_ex.append(exit_done)
                en_ex.append(profit)
                en_ex.append(hold_ts)
                en_ex_total.append(en_ex)
                en_ex = []
            else:
                hold_ts.append(row[-1].name)
                # en_ex.append(row[-1].name)
                # en_ex.append(exit_done)
                # en_ex.append(np.nan)
                


    en_ex_total_arr = np.array(en_ex_total, dtype=object)

    # 3. initialization
    df_list[col_i]['signal'] = df_buy.iloc[:, col_i]
    df_list[col_i]['buy'] = np.nan
    df_list[col_i]['hold'] = np.nan
    df_list[col_i]['sell'] = np.nan
    df_list[col_i]['profit'] = 1
    # df_list[col_i]['profit_cum'] = 1

    # 4. enlist trade info.
    #    a. profit_cum is invalid, cause accumulated profit could involve out ranked profit.
    if len(en_ex_total_arr) > 0:
        df_list[col_i]['buy'].loc[en_ex_total_arr[:, 0]] = 1
        df_list[col_i]['hold'].loc[np.hstack(en_ex_total_arr[:, -1])] = 1
        df_list[col_i]['sell'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 3]
        df_list[col_i]['profit'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 4]
        # df_list[col_i]['profit_cum'].loc[en_ex_total_arr[:, 2]] = en_ex_total_arr[:, 4]
        # df_list[col_i]['profit_cum'] = np.cumprod(df_list[col_i]['profit'])
        # df_list[col_i]['profit_cum'] = np.cumprod(df_list[col_i]['profit_cum'])
        
    buys.append(df_list[col_i]['buy']) # .to_numpy()
    holds.append(df_list[col_i]['hold']) # .to_numpy()
    sells.append(df_list[col_i]['sell']) # .to_numpy()
    profits.append(df_list[col_i]['profit']) # .to_numpy()
    # profit_cums.append(df_list[col_i]['profit_cum']) # .to_numpy()

    # display(df_list[col_i])

    if show_plot:
        plt.figure(figsize=(5,5))
        plt.plot(np.nancumprod(df_list[col_i]['profit']))
        plt.title(df_name_list[col_i])
        plt.show()
    
    if save_excel:
        df_list[col_i].to_excel(os.path.join(save_path, '{}'.format(df_name_list[col_i])), index=True)
    
    
    # print(len(df_list[col_i]['buy'].to_numpy()))
    # print(len(df_list[col_i]))
    # break

#### get result.

In [None]:
"""
aggregate traded data.
"""
pos_buy = pd.DataFrame(data=buys, index=symbol_names).transpose()
pos_hold = pd.DataFrame(data=holds, index=symbol_names).transpose()
pos_sell = pd.DataFrame(data=sells, index=symbol_names).transpose()
pos_profit = pd.DataFrame(data=profits, index=symbol_names).transpose()
# pos_profit_cum = pd.DataFrame(data=profit_cums, index=symbol_names).transpose()

show_print = 0
show_plot_res = 1

profits_list = []
# profits_cum_list = []

for ts_i in range(len(pos_buy)):

    # 0. Take trade info day by day.
    buy_info = pos_buy.iloc[ts_i]
    hold_info = pos_hold.iloc[ts_i]
    sell_info = pos_sell.iloc[ts_i]
    profit_info = pos_profit.iloc[ts_i]
    # profit_cum_info = pos_profit_cum.iloc[ts_i]    

    # 1. Take traded symbols.
    buy_on = buy_info[buy_info == 1]
    hold_on = hold_info[hold_info == 1]
    sell_on = sell_info[(sell_info == 1) | (sell_info == -1)]
    # profit_on = np.cumprod(profit_info[~pd.isnull(profit_info).to_numpy()])
    profit_on = np.nancumprod(profit_info)
    # print("profit_on :", profit_on)
    # if len(profit_on) == 0:
    #     profit_on = 1
    # else:
    #     profit_on = profit_on[-1]
    # profit_cum_on = np.cumprod(profit_cum_info[~pd.isnull(profit_cum_info)].to_numpy())[-1]
    # profit_cum_on = np.nancumprod(profit_cum_info)
    
    # print("(profit_info[~pd.isnull(profit_info)] :", profit_info[~pd.isnull(profit_info)].to_numpy())
    # print("(profit_cum_info[~pd.isnull(profit_cum_info)] :", profit_cum_info[~pd.isnull(profit_cum_info)].to_numpy())
    # break

    if show_print:
        print("기준일 :", buy_on.name)
        print("매수 종목 :", buy_on.index.values)
        print("보유 종목 :", hold_on.index.values)
        print("매도 종목 :", sell_on.index.values)
        print("일별수익률 : {:.3f}%".format(profit_on[-1]))
        print()

    profits_list.append(profit_on[-1])
    
    if ts_i > 100:
        break


#### plot cum profit

In [None]:
profit_on_cum = np.cumprod(profits_list)
print("누적수익률 : {:.3f}%".format(profit_on_cum[-1]))

if show_plot_res:
    plt.figure(figsize=(5,5))
    plt.plot(profit_on_cum)
    plt.show()