In [None]:
# External libs
import numpy as np
import pandas as pd
from tqdm import tqdm

In [None]:
# Load Data

submission_raw = pd.read_csv("./data/raw_data/past_open/sample_submission.csv")

datasets_df_raw_v1 = pd.read_csv("./data/raw_data/past_open/train.csv")
datasets_df_raw_v2 = pd.read_csv("./data/raw_data/train_additional.csv")

column_dict = {
    "일자": "date",
    "종목코드": "ticker_code",
    "종목명": "ticker_name",
    "거래량": "volume",
    "시가": "open",
    "고가": "high",
    "저가": "low",
    "종가": "close",
}

datasets_df_raw = pd.concat([datasets_df_raw_v1, datasets_df_raw_v2], axis=0)

datasets_df = datasets_df_raw.copy()
submission_df = submission_raw.copy()

In [None]:
# General preprocessing
# 1. Column name mapping
def map_column_names(datasets_df, column_dict):
    datasets_df.columns = [column_dict[column] for column in datasets_df.columns]
    return datasets_df


# 2. Drop outliers
def drop_zero(datasets_df):
    columns = ["volume", "open", "low", "high", "close"]
    for column in columns:
        datasets_df = datasets_df[datasets_df[column] != 0]
    return datasets_df


# 3. Drop lack data
class DROP_LACK_DATA:
    def __init__(self, df, percentage) -> None:
        self.df = df
        self.percentage = percentage

    def get_ticker_count_series(self):
        df = self.df
        ticker_count_series = df.groupby("ticker_code").count()["date"]
        return ticker_count_series

    def get_available_tickers(self, ticker_count_series):
        percentage = self.percentage

        available_tickers = ticker_count_series[
            ticker_count_series > ticker_count_series.max() * percentage
        ].index
        return available_tickers

    def filter_available_tickers(self, available_tickers):
        df = self.df
        df = df[df["ticker_code"].isin(available_tickers)]
        return df

    def __call__(self):
        ticker_count_series = self.get_ticker_count_series()
        available_tickers = self.get_available_tickers(ticker_count_series)
        df = self.filter_available_tickers(available_tickers)
        return df

In [None]:
# Utils

# 1. Get tickers
def get_ticker_codes(datasets_df):
    ticker_codes = sorted(set(datasets_df["ticker_code"]))
    return ticker_codes

# 2. Sort data by date
def sort_dataset_df(dataset_df, column):
    sorted_dataset_df = dataset_df.sort_values(column)
    return sorted_dataset_df

In [None]:
# Model preprocessing
# 1. Append price diff
def append_price_diff(dataset, open_col, close_col):
    dataset["price_diff"] = (dataset[open_col] - dataset[close_col]) / dataset[open_col]
    return dataset

# 2. Get arraylist
def get_array_list(dataset, column):
    _arraylist = dataset[column].values
    return _arraylist

In [None]:
# Model format dataset

def get_x_y_dataset(arraylist, CFG):
    i_window = CFG["input_window"]
    o_window = CFG["output_window"]

    x_dataset = list()
    y_dataset = list()

    for idx in range(len(arraylist) - i_window - o_window + 1):
        _x = arraylist[idx : idx + i_window]
        _y = arraylist[idx + i_window : idx + i_window + o_window]
        x_dataset.append(_x)
        y_dataset.append(_y)

    x_dataset = np.array(x_dataset)
    y_dataset = np.array(y_dataset)
    return x_dataset, y_dataset

In [None]:
# Model
## Get similairty_score
def get_cosine_similarity(array_1, array_2):
    cosine_similarity = np.dot(array_1, array_2) / (
        np.linalg.norm(array_1) * np.linalg.norm(array_2)
    )
    return cosine_similarity


def get_similarity_df(x_dataset, y_dataset, final_x):
    similarity_results = list()
    for x_data, y_data in zip(x_dataset, y_dataset):
        _similarity_score = get_cosine_similarity(x_data, final_x)
        similarity_results.append(
            {
                "similarity_score": _similarity_score,
                "actual_y": y_data,
            }
        )
    similarity_df = pd.DataFrame(similarity_results)
    return similarity_df


def get_similarity_main_df(x_dataset, y_dataset, final_x, n):
    similarity_results = list()
    for x_data, y_data in zip(x_dataset, y_dataset):
        _similarity_score = get_cosine_similarity(x_data, final_x)
        similarity_results.append(
            {
                "similarity_score": _similarity_score,
                "actual_y": y_data,
            }
        )
    similarity_df = pd.DataFrame(similarity_results)
    similarity_main_df = similarity_df.nlargest(n, "similarity_score")
    return similarity_main_df


def get_pred_y(similarity_df):
    pred_y = (similarity_df["similarity_score"] * similarity_df["actual_y"]).mean()
    return pred_y

In [None]:
CFG = {
    "dataset_window": 300,
    "input_window": 20,
    "output_window": 15,
}

In [None]:
"""
General Preprocessing 
"""
datasets_df = map_column_names(datasets_df, column_dict)
datasets_df = drop_zero(datasets_df)
datasets_df = DROP_LACK_DATA(datasets_df, 0.8)()

"""
Main
"""
ticker_codes = get_ticker_codes(datasets_df)

ticker_pred_dict = dict()
for ticker_code in tqdm(ticker_codes):
    dataset_df = datasets_df[datasets_df["ticker_code"] == ticker_code]

    """ 
    Utils
    """
    dataset_df = sort_dataset_df(dataset_df, "date")
    dataset_df = append_price_diff(dataset_df, "open", "close")

    """
    Model Preprocessing
    """
    price_diff_arraylist = get_array_list(dataset_df, "price_diff")
    price_diff_arraylist = price_diff_arraylist[
        -(CFG["dataset_window"] + CFG["input_window"]) :
    ]

    """
    Model format Dataset
    """
    x_dataset, y_dataset = get_x_y_dataset(price_diff_arraylist, CFG)
    y_dataset = y_dataset.sum(axis=1)
    final_x = price_diff_arraylist[-CFG["input_window"] :]
    """
    Model
    """
    similarity_main_df = get_similarity_main_df(x_dataset, y_dataset, final_x, 3)
    pred_y = get_pred_y(similarity_main_df)

    ticker_pred_dict[ticker_code] = pred_y

In [None]:
def make_submission_df(submission, ticker_score_dict):
    submission["score"] = submission["종목코드"].map(ticker_score_dict)
    submission["score"] = submission["score"].fillna(0)
    submission["순위"] = (
        submission["score"].rank(method="first", ascending=False).astype(int)
    )
    submission_result = submission.loc[:, ["종목코드", "순위"]]
    return submission_result



In [None]:
submission = submission_raw.copy()
submission_result = make_submission_df(submission, ticker_pred_dict)

submission_result.to_csv(
    f"./data/final_result/final_similarity_{CFG['dataset_window']}_{CFG['input_window']}_{CFG['output_window']}.csv"
)