In [72]:
# Imports

import numpy as np
import pandas as pd
import datetime as dt

import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages

plt.rcParams['font.family'] = 'AppleGothic'
plt.rcParams['axes.unicode_minus'] = False

from KFinanceDataReader import KFinanceDataReader
from private import dart_api_key

In [73]:
# Parameters
stock_code = '053450'
n_year = 5

## Date
end_date = dt.date.today()
start_date = end_date - dt.timedelta(days=365 * n_year)
start_date, end_date = start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d')

In [74]:
# Functions

In [75]:
## Load

In [76]:
## Preproc
def get_general_info(stock_code, kospi_info_df, kosdaq_info_df, krx_df):
    kospi_info_df['market'] = 'KOSPI'
    kosdaq_info_df['market'] = 'KOSDAQ'
    info_df = pd.concat([kospi_info_df, kosdaq_info_df], axis=0)

    _info_df = info_df[info_df['stock_code'] == stock_code].copy()
    _info_dict = _info_df.iloc[0].to_dict()

    same_sector_df = info_df[(info_df['market'] == _info_dict['market']) & (info_df['sector'] == _info_dict['sector'])]
    same_sector_krx_df = krx_df[krx_df['Code'].isin(same_sector_df['stock_code'])]

    stock_marcap_rank = same_sector_krx_df.set_index('Code')['Marcap'].rank(ascending=False).get(stock_code, None)
    _info_dict['marcap_rank'] = f"{len(same_sector_krx_df)} 중 {int(stock_marcap_rank)}위" if stock_marcap_rank else 'Unknown'

    general_info = (
        f"{_info_dict.get('stock_name', 'Unknown')} : "
        f"마켓:{_info_dict.get('market', 'Unknown')} / "
        f"섹터:{_info_dict.get('sector', 'Unknown')} / "
        f"시총순위:{_info_dict.get('marcap_rank', 'Unknown')}"
    )
    return general_info


def preproc(dataframe):
    dataframe.index = pd.to_datetime(dataframe.index)
    dataframe = dataframe.sort_index()
    return dataframe


def get_series(dataframe, col):
    series = dataframe[col]
    return series


def calc_volume_df(stock_trader_df):
    trader_volume_df = pd.concat(
        [
            (stock_trader_df['전체_매수'] + stock_trader_df['전체_매도']).rename('전체'),
            (stock_trader_df['기관_매수'] + stock_trader_df['기관_매도']).rename('기관'),
            (stock_trader_df['외국인_매수'] + stock_trader_df['외국인_매도']).rename('외국인'),
            (stock_trader_df['개인_매수'] + stock_trader_df['개인_매도']).rename('개인'),
        ],
        axis=1,
    )
    return trader_volume_df


def calc_net_buy_df(stock_trader_df):
    trader_net_buy_df = pd.concat(
        [
            (stock_trader_df['기관_매수'] - stock_trader_df['기관_매도']).rename('기관'),
            (stock_trader_df['외국인_매수'] - stock_trader_df['외국인_매도']).rename('외국인'),
            (stock_trader_df['개인_매수'] - stock_trader_df['개인_매도']).rename('개인'),
        ],
        axis=1,
    )
    return trader_net_buy_df

In [77]:
## Smoothing


def smooth_by_lowess(series, frac):
    from statsmodels.nonparametric.smoothers_lowess import lowess

    x = np.arange(1, len(series) + 1)
    y = series.to_numpy()
    result = lowess(y, x, frac=frac)
    smoothed_arr = result[:, 1]
    smoothed_series = pd.Series(smoothed_arr, index=series.index, name=series.name)
    return smoothed_series


def smooth_by_spline(series, s):
    from sklearn.preprocessing import MinMaxScaler
    from scipy.interpolate import UnivariateSpline

    x = np.arange(1, len(series) + 1)
    y = series.to_numpy().reshape(-1, 1)

    mms_y = MinMaxScaler()
    scaled_y = mms_y.fit_transform(y).flatten()

    spline = UnivariateSpline(x, scaled_y, s=s)
    scaled_smoothed_y = spline(x)
    smoothed_arr = mms_y.inverse_transform(scaled_smoothed_y.reshape(-1, 1)).flatten()
    smoothed_series = pd.Series(smoothed_arr, index=series.index, name=series.name)
    return smoothed_series


def find_best_param_lowess(smooth_by_lowess, inflection_point_by_exrema, price_series):
    lowess_params = [(0.01,), (0.05,), (0.1,), (0.2,), (0.3,), (0.4,), (0.5,), (0.6,), (0.7,), (0.8,)]
    for param in lowess_params:
        smoothed_series = smooth_by_lowess(price_series, *param)
        inflection_points = inflection_point_by_exrema(smoothed_series.to_numpy(), 10)
        if len(inflection_points) < 6:
            break
    return param


def find_best_param_spline(smooth_by_spline, inflection_point_by_exrema, price_series):
    spline_params = [(1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,), (10,), (20,), (30,)]
    for param in spline_params:
        smoothed_series = smooth_by_spline(price_series, *param)
        inflection_points = inflection_point_by_exrema(smoothed_series.to_numpy(), 10)
        if len(inflection_points) < 6:
            break
    return param

In [78]:
## Inflection points
def inflection_point_by_exrema(arr, window):
    from scipy.signal import argrelextrema

    local_maxima = argrelextrema(arr, np.greater, order=window)[0]
    local_minima = argrelextrema(arr, np.less, order=window)[0]

    inflection_points = sorted(np.concatenate((local_maxima, local_minima)))
    return inflection_points

In [79]:
## cumsum
def get_split_cumsum(net_buy_df):
    # 전체 누적합
    total_cumsum = net_buy_df.cumsum()

    # 마지막 1년 데이터를 기준으로 누적합 계산
    one_year_index = net_buy_df.index >= (net_buy_df.index[-1] - pd.DateOffset(years=1))
    one_year_cumsum = net_buy_df.loc[one_year_index].cumsum()

    # 마지막 6개월 데이터를 기준으로 누적합 계산
    six_month_index = net_buy_df.index >= (net_buy_df.index[-1] - pd.DateOffset(months=6))
    six_month_cumsum = net_buy_df.loc[six_month_index].cumsum()

    # 마지막 3개월 데이터를 기준으로 누적합 계산
    three_month_index = net_buy_df.index >= (net_buy_df.index[-1] - pd.DateOffset(months=3))
    three_month_cumsum = net_buy_df.loc[three_month_index].cumsum()

    # 결과 반환
    return (total_cumsum, one_year_cumsum, six_month_cumsum, three_month_cumsum)

In [80]:
## Plot
def plot_price_series(price_series, smoothed_series_list, inflection_points_list, labels, general_info):
    plt.figure(figsize=(14, 6))
    colors = ['r','g','b']
    plt.plot(price_series.index, price_series, label='RAW', c='k', alpha=0.5)
    for smoothed_series,inflection_points,label,color in zip(smoothed_series_list,inflection_points_list,labels,colors):
        plt.plot(smoothed_series.index, smoothed_series, label=f'{label} Smoothed', c=color, linewidth=3, alpha=0.5)
        plt.scatter(price_series.index[inflection_points], price_series.iloc[inflection_points], label=f'{label} Inflection Point', s=200, alpha=0.7, c=color)
    plt.legend(loc='upper left',fontsize=12)
    plt.title(general_info, fontsize=14)
    plt.tight_layout()
    plt.show()
    
def plot_cumsums(cumsums, general_info):
    plt.figure(figsize=(18, 10))

    for idx, (title, cumsum) in enumerate(cumsums):
        plt.subplot(2, 2, idx + 1)
        for col in cumsum.columns:
            plt.plot(cumsum.index, cumsum[col], label=col)
        plt.legend(loc='upper left', fontsize=12)
        plt.title(title, fontsize=12)
    plt.suptitle(general_info, fontsize=14)
    plt.tight_layout()
    plt.show()

In [81]:
# Run

In [82]:
## Load
k_finance_data_reader = KFinanceDataReader()

market_data_reader = k_finance_data_reader.MarketDataReader()
corp_data_reader = k_finance_data_reader.CorpDataReader(dart_api_key)

### Stock General Info
kospi_info_df = market_data_reader.get_kospi_info_df()
kosdaq_info_df = market_data_reader.get_kosdaq_info_df()
krx_df = KFinanceDataReader().MarketDataReader().finance_data_reader.StockListing('KRX')

### Stock Market Info
stock_ohlcv_df = market_data_reader.get_stock_ohlcv_df(
    stock_code=stock_code,
    start_date=start_date,
    end_date=end_date,
)

stock_trader_df = market_data_reader.get_stock_trader_df(
    stock_code=stock_code,
    start_date=start_date,
    end_date=end_date,
)

In [83]:
## Preproc
general_info = get_general_info(stock_code, kospi_info_df, kosdaq_info_df, krx_df)

stock_ohlcv_df = preproc(stock_ohlcv_df)
price_series = get_series(stock_ohlcv_df, 'close')
recent_1_year_price_series = price_series[-252:]
recent_6_month_price_series = price_series[-126:]

trader_net_buy_df = calc_net_buy_df(stock_trader_df)
trader_net_buy_df = preproc(trader_net_buy_df)
cumsums = get_split_cumsum(trader_net_buy_df)
net_buy_cumsums = [
    ('Net Buy Total Cumsum', cumsums[0]),
    ('Last 1 Year Net Buy Cumsum', cumsums[1]),
    ('Last 6 Month Net Buy Cumsum', cumsums[2]),
    ('Last 3 Month Net Buy Cumsum', cumsums[3]),
]

In [84]:
## Smoothing
lowess_param = find_best_param_lowess(smooth_by_lowess, inflection_point_by_exrema, price_series)
price_series_smoothed_by_lowess = smooth_by_lowess(price_series, *lowess_param)
price_series_smoothed_by_lowess_recent_1_year = smooth_by_lowess(recent_1_year_price_series, *lowess_param)
price_series_smoothed_by_lowess_recent_6_month = smooth_by_lowess(recent_6_month_price_series, *lowess_param)

spline_param = find_best_param_spline(smooth_by_spline, inflection_point_by_exrema, price_series)
price_series_smoothed_by_spline = smooth_by_spline(price_series, *spline_param)
price_series_smoothed_by_spline_recent_1_year = smooth_by_spline(recent_1_year_price_series, *spline_param)
price_series_smoothed_by_spline_recent_6_month = smooth_by_spline(recent_6_month_price_series, *spline_param)

In [85]:
## Inflection points
inflection_points_by_lowess = inflection_point_by_exrema(price_series_smoothed_by_lowess.to_numpy(), 10)
inflection_points_1_year_by_lowess = inflection_point_by_exrema(price_series_smoothed_by_lowess_recent_1_year.to_numpy(), 10)
inflection_points_6_month_by_lowess = inflection_point_by_exrema(price_series_smoothed_by_lowess_recent_6_month.to_numpy(), 10)

inflection_points_by_spline = inflection_point_by_exrema(price_series_smoothed_by_spline.to_numpy(), 10)
inflection_points_1_year_by_spline = inflection_point_by_exrema(price_series_smoothed_by_spline_recent_1_year.to_numpy(), 10)
inflection_points_6_month_by_spline = inflection_point_by_exrema(price_series_smoothed_by_spline_recent_6_month.to_numpy(), 10)

In [86]:
def plot_price_series_to_pdf(price_series, smoothed_series_list, inflection_points_list, labels, figtext, general_info, pdf):
    plt.figure(figsize=(16, 5))
    colors = ['r', 'g', 'b']
    plt.plot(price_series.index, price_series, label='RAW', c='k', alpha=0.5)
    for smoothed_series, inflection_points, label, color in zip(smoothed_series_list, inflection_points_list, labels, colors):
        plt.plot(smoothed_series.index, smoothed_series, label=f'{label} Smoothed', c=color, linewidth=3, alpha=0.5)
        plt.scatter(
            price_series.index[inflection_points],
            price_series.iloc[inflection_points],
            label=f'{label} Inflection Point',
            s=200,
            alpha=0.7,
            c=color,
        )
    plt.legend(loc='upper left', fontsize=12)
    plt.title(general_info, fontsize=14)
    plt.figtext(0.5, 0.01, figtext, ha='center', fontsize=12)
    plt.tight_layout()
    pdf.savefig()
    plt.close()


def plot_cumsums_to_pdf(cumsums, general_info, pdf):
    plt.figure(figsize=(16, 10))
    for idx, (title, cumsum) in enumerate(cumsums):
        plt.subplot(2, 2, idx + 1)
        for col in cumsum.columns:
            plt.plot(cumsum.index, cumsum[col], label=col)
        plt.legend(loc='upper left', fontsize=12)
        plt.title(title, fontsize=12)
    plt.suptitle(general_info, fontsize=14)
    plt.figtext(0.5, 0.01, "거래자 별 거래량 분석", ha='center', fontsize=12)
    plt.tight_layout()
    pdf.savefig()
    plt.close()

In [87]:
## To PDF
stock_name = general_info.split(':')[0].strip()

with PdfPages(f'{start_date}-{end_date}-{stock_name}.pdf') as pdf:
    # total year
    plot_price_series_to_pdf(
        price_series,
        [price_series_smoothed_by_lowess, price_series_smoothed_by_spline],
        [inflection_points_by_lowess, inflection_points_by_spline],
        ['Lowess', 'Spline'],
        f'{n_year} 년 분석',
        general_info,
        pdf,
    )
    
    # recent 1 year
    plot_price_series_to_pdf(
        recent_1_year_price_series,
        [price_series_smoothed_by_lowess_recent_1_year, price_series_smoothed_by_spline_recent_1_year],
        [inflection_points_1_year_by_lowess, inflection_points_1_year_by_spline],
        ['Lowess', 'Spline'],
        '최근 1년 분석',
        general_info+'\n최근 1년 분석',
        pdf,
    )
    
    # recent 6 month
    plot_price_series_to_pdf(
        recent_6_month_price_series,
        [price_series_smoothed_by_lowess_recent_6_month, price_series_smoothed_by_spline_recent_6_month],
        [inflection_points_6_month_by_lowess, inflection_points_6_month_by_spline],
        ['Lowess', 'Spline'],
        '최근 6개월 분석',
        general_info+'\n최근 6개월 분석',
        pdf,
    )
    plot_cumsums_to_pdf(net_buy_cumsums, general_info, pdf)