In [None]:
!pip install pykrx statsmodels scikit-learn pandas numpy matplotlib

Collecting pykrx
  Downloading pykrx-1.0.51-py3-none-any.whl.metadata (61 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/61.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.1/61.1 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
Collecting datetime (from pykrx)
  Downloading datetime-6.0-py3-none-any.whl.metadata (34 kB)
Collecting deprecated (from pykrx)
  Downloading deprecated-1.3.1-py2.py3-none-any.whl.metadata (5.9 kB)
Collecting zope.interface (from datetime->pykrx)
  Downloading zope_interface-8.1.1-cp312-cp312-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl.metadata (45 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.2/45.2 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
Downloading pykrx-1.0.51-py3-none-any.whl (2.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m14.0 MB/s[0m eta [36m0:00:00[0m
[

In [None]:
# Indicator Parameter Optimization for Daily Stock Classification
# - PyKRx daily OHLCV
# - Base candle features + optional SMA/RSI/MACD
# - Lag features
# - Time-series walk-forward evaluation
# - Supports binary / triple labels

from pykrx import stock
import pandas as pd
import numpy as np

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    accuracy_score, confusion_matrix, classification_report, f1_score
)

# Indicator functions
def add_sma_features(df, windows=(5, 20, 60)):
    df = df.copy()
    for w in windows:
        df[f'sma_{w}'] = df['close'].rolling(w).mean()
        df[f'sma_{w}_dist'] = (df['close'] - df[f'sma_{w}']) / (df[f'sma_{w}'] + 1e-9) * 100
    return df


def add_macd_features(df, fast=12, slow=26, signal=9):
    df = df.copy()
    ema_fast = df['close'].ewm(span=fast, adjust=False).mean()
    ema_slow = df['close'].ewm(span=slow, adjust=False).mean()
    df['macd'] = ema_fast - ema_slow
    df['macd_signal'] = df['macd'].ewm(span=signal, adjust=False).mean()
    df['macd_hist'] = df['macd'] - df['macd_signal']
    return df


def add_rsi_features(df, period=14):
    df = df.copy()
    delta = df['close'].diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)

    avg_gain = gain.ewm(alpha=1/period, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1/period, adjust=False).mean()

    rs = avg_gain / (avg_loss + 1e-9)
    df['rsi'] = 100 - (100 / (1 + rs))
    return df


# Base candle features
def add_base_features(df):
    df = df.copy()

    df['body'] = abs((df['close'] - df['open']) / (df['open'] + 1e-9)) * 100
    df['upper_shadow'] = (df['high'] - df[['open', 'close']].max(axis=1)) / (df['open'] + 1e-9) * 100
    df['lower_shadow'] = (df[['open', 'close']].min(axis=1) - df['low']) / (df['open'] + 1e-9) * 100

    range_hl = (df['high'] - df['low']).replace(0, np.nan)
    df['body_ratio'] = abs(df['close'] - df['open']) / (range_hl + 1e-9)
    df['shadow_ratio'] = (df['upper_shadow'] - df['lower_shadow']) / (((range_hl / (df['open'] + 1e-9)) * 100) + 1e-9)

    df['direction'] = np.sign(df['close'] - df['open'])
    df['volume_strength'] = df['volume'] / (df['volume'].rolling(5).mean() + 1e-9)
    df['momentum'] = (df['close'] - df['close'].shift(1)) / (df['close'].shift(1) + 1e-9) * 100

    return df


# Labeling
def add_labels(df, threshold=0.3, label_mode="triple"):
    df = df.copy()
    df['next_close'] = df['close'].shift(-1)
    df['return'] = (df['next_close'] - df['close']) / (df['close'] + 1e-9) * 100

    if label_mode == "triple":
        conditions = [
            df['return'] < -threshold,
            (df['return'] >= -threshold) & (df['return'] <= threshold),
            df['return'] > threshold
        ]
        df['label'] = np.select(conditions, [0, 1, 2])

    elif label_mode == "binary":
        df['label'] = np.where(
            df['return'] < -threshold, 0,
            np.where(df['return'] > threshold, 1, np.nan)
        )
    else:
        raise ValueError("label_mode must be 'triple' or 'binary'")

    return df


# Lag features
def add_lag_features(df, feature_cols, window_size=5):
    df = df.copy()
    for col in feature_cols:
        for i in range(1, window_size + 1):
            df[f'{col}_{i}_days_ago'] = df[col].shift(i)
    return df


# Data loader
def load_daily_ohlcv(start_date, end_date, ticker_code):
    ohlcv = stock.get_market_ohlcv_by_date(start_date, end_date, ticker_code)
    df = ohlcv[['시가', '고가', '저가', '종가', '거래량']].copy()
    df.columns = ['open', 'high', 'low', 'close', 'volume']
    return df


# Dataset builder with indicator params
def prepare_dataset_with_params(
    start_date="20220101",
    end_date="20241231",
    ticker_code="005930",
    window_size=5,
    threshold=0.3,
    label_mode="triple",
    use_sma=True,
    sma_windows=(5, 20, 60),
    use_rsi=True,
    rsi_period=14,
    use_macd=True,
    macd_fast=12,
    macd_slow=26,
    macd_signal=9,
):
    df = load_daily_ohlcv(start_date, end_date, ticker_code)

    # base
    df = add_base_features(df)

    # indicators
    if use_sma:
        df = add_sma_features(df, windows=sma_windows)
    if use_macd:
        df = add_macd_features(df, fast=macd_fast, slow=macd_slow, signal=macd_signal)
    if use_rsi:
        df = add_rsi_features(df, period=rsi_period)

    # labels
    df = add_labels(df, threshold=threshold, label_mode=label_mode)

    # feature columns (current)
    base_cols = [
        'body', 'upper_shadow', 'lower_shadow', 'body_ratio',
        'shadow_ratio', 'direction', 'volume_strength', 'momentum'
    ]

    indicator_cols = []
    if use_sma:
        for w in sma_windows:
            indicator_cols += [f'sma_{w}', f'sma_{w}_dist']
    if use_macd:
        indicator_cols += ['macd', 'macd_signal', 'macd_hist']
    if use_rsi:
        indicator_cols += ['rsi']

    feature_cols = base_cols + indicator_cols

    # lags
    df = add_lag_features(df, feature_cols, window_size=window_size)

    # clean
    df.dropna(inplace=True)

    X = df.filter(regex='_days_ago$')
    y = df['label']
    if label_mode == "binary":
        y = y.astype(int)

    return df, X, y


# Walk-forward evaluation
def walk_forward_score(
    X, y,
    label_mode="triple",
    n_splits=4,
):
    """
    Simple expanding-window walk-forward.
    Returns mean macro F1 and mean accuracy.
    """
    n = len(X)
    if n < 200:
        # too small -> fallback to simple last split
        split = int(n * 0.7)
        splits = [(0, split, split, n)]
    else:
        # build n_splits test blocks at the end
        # expanding train: [0:train_end], test: next block
        block = n // (n_splits + 1)
        splits = []
        for k in range(1, n_splits + 1):
            train_end = block * k
            test_start = train_end
            test_end = min(train_end + block, n)
            if test_end - test_start < 20:
                continue
            splits.append((0, train_end, test_start, test_end))

    f1s = []
    accs = []

    for tr_s, tr_e, te_s, te_e in splits:
        X_train = X.iloc[tr_s:tr_e]
        y_train = y.iloc[tr_s:tr_e]
        X_test = X.iloc[te_s:te_e]
        y_test = y.iloc[te_s:te_e]

        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        if label_mode == "triple":
            model = LogisticRegression(max_iter=3000, multi_class="multinomial")
        else:
            model = LogisticRegression(max_iter=3000)

        model.fit(X_train_scaled, y_train)
        y_pred = model.predict(X_test_scaled)

        f1 = f1_score(y_test, y_pred, average="macro")
        acc = accuracy_score(y_test, y_pred)

        f1s.append(f1)
        accs.append(acc)

    return float(np.mean(f1s)), float(np.mean(accs))


# Optimization
def optimize_indicators(
    ticker_code="005930",
    start_date="20220101",
    end_date="20241231",
    label_mode="binary",
    threshold=0.3,
    window_size=5,
    n_splits=4,
):
    """
    Grid search over indicator params + simple combinations.
    Returns (best_config_dict, results_df)
    """

    # ---- Small, practical grids (start small!)
    sma_window_sets = [
        (3, 10, 30),
        (5, 20, 60),
        (10, 30, 90),
    ]
    rsi_periods = [7, 14, 21]
    macd_params = [
        (8, 20, 7),
        (12, 26, 9),
        (10, 30, 9),
    ]

    # indicator combinations to test
    combos = [
        (True, False, False),   # SMA only
        (False, True, False),   # RSI only
        (False, False, True),   # MACD only
        (True, True, False),    # SMA + RSI
        (True, False, True),    # SMA + MACD
        (False, True, True),    # RSI + MACD
        (True, True, True),     # SMA + RSI + MACD
    ]

    records = []
    best = None
    best_f1 = -1

    for use_sma, use_rsi, use_macd in combos:
        for sma_windows in (sma_window_sets if use_sma else [(5, 20, 60)]):
            for rsi_p in (rsi_periods if use_rsi else [14]):
                for macd_fast, macd_slow, macd_sig in (macd_params if use_macd else [(12, 26, 9)]):

                    try:
                        df, X, y = prepare_dataset_with_params(
                            start_date=start_date,
                            end_date=end_date,
                            ticker_code=ticker_code,
                            window_size=window_size,
                            threshold=threshold,
                            label_mode=label_mode,
                            use_sma=use_sma,
                            sma_windows=sma_windows,
                            use_rsi=use_rsi,
                            rsi_period=rsi_p,
                            use_macd=use_macd,
                            macd_fast=macd_fast,
                            macd_slow=macd_slow,
                            macd_signal=macd_sig,
                        )

                        if len(X) < 100:
                            continue

                        mean_f1, mean_acc = walk_forward_score(
                            X, y, label_mode=label_mode, n_splits=n_splits
                        )

                        cfg = {
                            "use_sma": use_sma,
                            "use_rsi": use_rsi,
                            "use_macd": use_macd,
                            "sma_windows": sma_windows,
                            "rsi_period": rsi_p,
                            "macd_fast": macd_fast,
                            "macd_slow": macd_slow,
                            "macd_signal": macd_sig,
                            "window_size": window_size,
                            "threshold": threshold,
                            "label_mode": label_mode,
                            "mean_macro_f1": mean_f1,
                            "mean_accuracy": mean_acc,
                            "n_samples": len(X),
                        }

                        records.append(cfg)

                        if mean_f1 > best_f1:
                            best_f1 = mean_f1
                            best = cfg

                    except Exception as e:
                        # skip bad configs safely
                        continue

    results = pd.DataFrame(records).sort_values(
        by=["mean_macro_f1", "mean_accuracy", "n_samples"],
        ascending=False
    ).reset_index(drop=True)

    return best, results


# Optional: train final with best config
def train_with_config(
    config,
    ticker_code="005930",
    start_date="20220101",
    end_date="20241231",
    train_ratio=0.7,
    show_report=True
):
    df, X, y = prepare_dataset_with_params(
        start_date=start_date,
        end_date=end_date,
        ticker_code=ticker_code,
        window_size=config["window_size"],
        threshold=config["threshold"],
        label_mode=config["label_mode"],
        use_sma=config["use_sma"],
        sma_windows=config["sma_windows"],
        use_rsi=config["use_rsi"],
        rsi_period=config["rsi_period"],
        use_macd=config["use_macd"],
        macd_fast=config["macd_fast"],
        macd_slow=config["macd_slow"],
        macd_signal=config["macd_signal"],
    )

    n = len(X)
    split = int(n * train_ratio)
    X_train, X_test = X.iloc[:split], X.iloc[split:]
    y_train, y_test = y.iloc[:split], y.iloc[split:]

    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    if config["label_mode"] == "triple":
        model = LogisticRegression(max_iter=3000, multi_class="multinomial")
    else:
        model = LogisticRegression(max_iter=3000)

    model.fit(X_train_scaled, y_train)
    y_pred = model.predict(X_test_scaled)

    if show_report:
        print("=" * 80)
        print("FINAL TRAIN WITH BEST CONFIG")
        print({k: config[k] for k in config if k not in ["mean_macro_f1","mean_accuracy","n_samples"]})
        print(f"Holdout Accuracy: {accuracy_score(y_test, y_pred):.4f}")
        print(f"Holdout Macro F1: {f1_score(y_test, y_pred, average='macro'):.4f}")
        print(confusion_matrix(y_test, y_pred))
        print(classification_report(y_test, y_pred, digits=4))

    return {
        "df": df,
        "X": X,
        "y": y,
        "model": model,
        "scaler": scaler,
    }


# usage
if __name__ == "__main__":
    best, results = optimize_indicators(
        ticker_code="005930",
        start_date="20220101",
        end_date="20241231",
        label_mode="binary",   # or "triple"
        threshold=0.3,
        window_size=5,
        n_splits=4,
    )

    print("BEST CONFIG (by mean macro F1):")
    print(best)

    print("\nTOP 10 RESULTS:")
    print(results.head(10))

    if best is not None:
        _ = train_with_config(
            best,
            ticker_code="005930",
            start_date="20220101",
            end_date="20241231",
            train_ratio=0.7
        )


BEST CONFIG (by mean macro F1):
{'use_sma': True, 'use_rsi': False, 'use_macd': True, 'sma_windows': (10, 30, 90), 'rsi_period': 14, 'macd_fast': 12, 'macd_slow': 26, 'macd_signal': 9, 'window_size': 5, 'threshold': 0.3, 'label_mode': 'binary', 'mean_macro_f1': 0.5094447394982701, 'mean_accuracy': 0.5495283018867925, 'n_samples': 530}

TOP 10 RESULTS:
   use_sma  use_rsi  use_macd   sma_windows  rsi_period  macd_fast  macd_slow  \
0     True    False      True  (10, 30, 90)          14         12         26   
1     True    False      True  (10, 30, 90)          14         10         30   
2     True     True      True   (5, 20, 60)           7         12         26   
3    False    False      True   (5, 20, 60)          14         12         26   
4     True     True      True   (5, 20, 60)           7         10         30   
5     True    False      True  (10, 30, 90)          14          8         20   
6     True     True     False   (3, 10, 30)           7         12         26  