In [1]:
# === Import Library ===
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter

from sklearn.preprocessing import MinMaxScaler, RobustScaler
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization, Bidirectional, Input
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from deap import base, creator, tools, algorithms
import joblib

In [2]:
# === Load dan Preprocessing Data Saham ===
df = pd.read_csv('usd_idr_processed_data.csv', parse_dates=['Date'], index_col='Date')
df['MA5'] = df['Close'].rolling(5).mean()
df['MA10'] = df['Close'].rolling(10).mean()
df['ROC5'] = df['Close'].pct_change(5)

# Tambahkan fitur teknikal tambahan (RSI)
def compute_rsi(series, period=14):
    delta = series.diff()
    gain = delta.where(delta > 0, 0).rolling(window=period).mean()
    loss = -delta.where(delta < 0, 0).rolling(window=period).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

df['RSI'] = compute_rsi(df['Close'])
df.dropna(inplace=True)

features = ['Open', 'High', 'Low', 'Close', 'MA5', 'MA10', 'ROC5', 'RSI']
data = df[features].values
scaler = RobustScaler()
data_scaled = scaler.fit_transform(data)

# === Visualisasi Distribusi Label (Setelah Label Dibuat) ===
all_labels = []

In [3]:
# === Setup Evolutionary Algorithm (EA) ===
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("units", random.randint, 50, 200)
toolbox.register("opt_idx", random.randint, 0, 1)
toolbox.register("lr_exp", random.randint, 2, 5)
toolbox.register("epochs", random.randint, 10, 50)
toolbox.register("individual", tools.initCycle, creator.Individual,
                 (toolbox.units, toolbox.opt_idx, toolbox.lr_exp, toolbox.epochs), n=1)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutUniformInt,
                 low=[50, 0, 2, 10], up=[200, 1, 5, 50], indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)

In [4]:
window_sizes = [10, 15, 20, 25, 30]
results = []
for window_size in window_sizes:
    print(f"=== Window Size: {window_size} ===")

    # Buat dataset dengan sliding window
    X, y = [], []
    threshold_pct = 0.005

    for i in range(len(data_scaled) - window_size - 1):
        X.append(data_scaled[i:i+window_size])
        c0 = data_scaled[i+window_size][3]
        c1 = data_scaled[i+window_size+1][3]
        pct = (c1 - c0) / c0 if c0 != 0 else 0
        if pct > threshold_pct:
            y.append(2)
        elif pct < -threshold_pct:
            y.append(0)
        else:
            y.append(1)

    all_labels.extend(y)

    X = np.array(X)
    y = to_categorical(y, num_classes=3)

    split = int(len(X) * 0.8)
    X_train, X_test = X[:split], X[split:]
    y_train, y_test = y[:split], y[split:]

    weights = compute_class_weight('balanced', classes=[0, 1, 2], y=np.argmax(y_train, axis=1))
    class_weights = dict(enumerate(weights))

    def create_model(units, optimizer_name, lr):
        model = Sequential()
        model.add(Input(shape=(window_size, len(features))))
        model.add(Bidirectional(LSTM(units, return_sequences=True)))
        model.add(Dropout(0.3))
        model.add(BatchNormalization())
        model.add(LSTM(units // 2, return_sequences=True, activation='relu'))
        model.add(Dropout(0.3))
        model.add(LSTM(units // 4, activation='relu'))
        model.add(Dense(32, activation='relu'))
        model.add(Dropout(0.3))
        model.add(Dense(3, activation='softmax'))
        optimizer = Adam(learning_rate=lr) if optimizer_name == 'adam' else RMSprop(learning_rate=lr)
        model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
        return model

    def eval_fitness(ind):
        units, opt_idx, lr_exp, epochs = ind
        opt_name = ['adam', 'rmsprop'][opt_idx]
        lr = 10 ** (-lr_exp)
        model = create_model(units, opt_name, lr)
        early = EarlyStopping(patience=3, restore_best_weights=True)
        hist = model.fit(X_train, y_train, epochs=epochs, batch_size=16, verbose=0,
                         validation_data=(X_test, y_test), callbacks=[early], class_weight=class_weights)
        return max(hist.history['val_accuracy']),

    toolbox.register("evaluate", eval_fitness)
    pop = toolbox.population(n=10)
    NGEN = 5
    for gen in range(NGEN):
        offspring = algorithms.varAnd(pop, toolbox, cxpb=0.5, mutpb=0.2)
        fits = list(map(toolbox.evaluate, offspring))
        for fit, ind in zip(fits, offspring):
            ind.fitness.values = fit
        pop = toolbox.select(offspring, k=len(pop))

    best = tools.selBest(pop, k=1)[0]
    units, opt_idx, lr_exp, epochs = best
    opt_name = ['adam', 'rmsprop'][opt_idx]
    lr = 10 ** (-lr_exp)

    model = create_model(units, opt_name, lr)
    model.fit(X_train, y_train, epochs=epochs, batch_size=16, verbose=0, class_weight=class_weights)
    _, acc = model.evaluate(X_test, y_test, verbose=0)

    results.append((window_size, acc, units, opt_name, lr, epochs, model, X_test, y_test))
    print(f"Window: {window_size}, Akurasi: {acc:.4f}, Units: {units}, Opt: {opt_name}, LR: {lr}, Epochs: {epochs}")


=== Window Size: 10 ===


  pct = (c1 - c0) / c0
  super().__init__(**kwargs)


KeyboardInterrupt: 

In [None]:
sns.countplot(x=[['Turun', 'Stasioner', 'Naik'][i] for i in all_labels])
plt.title('Distribusi Label Kelas')
plt.xlabel('Kelas')
plt.ylabel('Jumlah')
plt.show()

best_result = max(results, key=lambda x: x[1])
print("\n=== Hasil Terbaik ===")
print(f"Window Size: {best_result[0]}")
print(f"Akurasi: {best_result[1]:.4f}")
print(f"Arsitektur: {best_result[2]} Units, Optimizer: {best_result[3]}, LR: {best_result[4]}, Epochs: {best_result[5]}")

best_model = best_result[6]
X_test_best = best_result[7]
y_test_best = best_result[8]
joblib.dump(scaler, "scaler_usdidr.pkl")
best_model.save("model_usdidr_final.h5")

In [None]:
y_pred = best_model.predict(X_test_best)
y_pred_labels = np.argmax(y_pred, axis=1)
y_true_labels = np.argmax(y_test_best, axis=1)

plt.figure(figsize=(10,4))
plt.plot(y_true_labels[:100], label='Aktual')
plt.plot(y_pred_labels[:100], label='Prediksi')
plt.title('Prediksi vs Aktual (Sample 100)')
plt.legend()
plt.show()


In [None]:
latest_window = data_scaled[-best_result[0]:].reshape(1, best_result[0], len(features))
pred_prob = best_model.predict(latest_window)
pred_class = np.argmax(pred_prob)
labels = ['Turun', 'Stasioner', 'Naik']
print("\nPrediksi tren data terbaru:", labels[pred_class])

future_steps = 5
last_window = data_scaled[-best_result[0]:].copy()
predictions = []

for _ in range(future_steps):
    input_window = last_window.reshape(1, best_result[0], len(features))
    pred = best_model.predict(input_window)
    pred_class = np.argmax(pred)
    predictions.append(labels[pred_class])
    last_window = np.roll(last_window, -1, axis=0)
    last_window[-1] = last_window[-2]

print("\nPrediksi multi-langkah ke depan:", predictions)