In [8]:
# Input and Output from Dataset
import pandas as pd
import numpy as np

In [9]:
# Generate input - output data

def generate_xy(num_samples=10):
    # x = np.random.uniform(-1, 1, num_samples) # Input random
    x = np.random.randint(-100, 100, num_samples) # Input random
    y = np.zeros(num_samples)
    
    y[0] = 0   # Inisialisasi
    
    for k in range(1, num_samples):
        y[k] = 1 / (1 + y[k-1]**2) + 0.25*x[k] - 0.3*x[k-1]
    
    df = pd.DataFrame({"x": x, "y": y})
    return df

df = generate_xy(30002)
print(df)

        x          y
0      34   0.000000
1     -53 -22.450000
2     -73  -2.348020
3     -97  -2.196466
4      98  53.771690
...    ..        ...
29997  48   3.301395
29998  42  -3.815961
29999  90   9.964261
30000 -77 -46.240029
30001 -71   5.350467

[30002 rows x 2 columns]


In [None]:
# Generate dataset from input output
df_lagged = pd.DataFrame({
    "x(t)"   : df["x"],
    "x(t-1)" : df["x"].shift(1),
    "x(t-2)" : df["x"].shift(2),
    "y(t-1)" : df["y"].shift(1),
    "y(t-2)" : df["y"].shift(2),
    "y(t)"   : df["y"]   # target
})  # 3 lagged inputs and 2 lagged outputs

df_lagged = df_lagged.dropna().reset_index(drop=True) # drop NaN values from first lagged rows

# Check the dataset
print(df_lagged.head())
print(df_lagged.shape)

   x(t)  x(t-1)  x(t-2)     y(t-1)     y(t-2)       y(t)
0   -73   -53.0    34.0 -22.450000   0.000000  -2.348020
1   -97   -73.0   -53.0  -2.348020 -22.450000  -2.196466
2    98   -97.0   -73.0  -2.196466  -2.348020  53.771690
3    56    98.0   -97.0  53.771690  -2.196466 -15.399654
4   -44    56.0    98.0 -15.399654  53.771690 -27.795801
(30000, 6)


In [None]:
class BPNN:
    def __init__(self, input_size, hidden_size, output_size, lr=0.001, seed=42):
        np.random.seed(seed)

        # struktur
        self.NI = input_size
        self.NH = hidden_size
        self.NO = output_size
        self.lr = lr

        # inisialisasi bobot
        self.v = np.random.rand(self.NI, self.NH)   # input → hidden
        self.vb = np.random.rand(self.NH)           # bias hidden
        self.w = np.random.rand(self.NH, self.NO)   # hidden → output
        self.wb = np.random.rand(self.NO)           # bias output

    # --- Activation functions ---
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def sigmoid_derivative(self, x):
        s = self.sigmoid(x)
        return s * (1 - s)

    # --- Feedforward ---
    def feedforward(self, x):
        # hidden
        self.z_in = np.dot(x, self.v) + self.vb
        self.z = self.sigmoid(self.z_in)

        # output (pakai linear, bisa diganti sigmoid/tanh kalau mau)
        self.y_in = np.dot(self.z, self.w) + self.wb
        self.y = self.y_in  

        return self.y

    # --- Backpropagation ---
    def backpropagation(self, x, t):
        # error output
        delta_y = (t - self.y)
        del_w = self.lr * np.outer(self.z, delta_y)
        del_wb = self.lr * delta_y

        # error hidden
        delta_zin = np.dot(delta_y, self.w.T)
        delta_z = delta_zin * self.z * (1 - self.z)  # sigmoid derivative
        del_v = self.lr * np.outer(x, delta_z)
        del_vb = self.lr * delta_z

        # update bobot
        self.w += del_w
        self.wb += del_wb
        self.v += del_v
        self.vb += del_vb

        return np.mean(delta_y**2)   # return MSE

    # --- Training loop ---
    def fit(self, X, T, epochs=1000, tol=0.01):
        history = []
        for epoch in range(epochs):
            mse = 0
            for x, t in zip(X, T):
                self.feedforward(x)
                mse += self.backpropagation(x, t)
            mse /= len(X)
            history.append(mse)
            if mse < tol:
                print(f"Training stopped at epoch {epoch}, MSE={mse:.6f}")
                break
        return history

    # --- Prediction ---
    def predict(self, X):
        outputs = []
        for x in X:
            outputs.append(self.feedforward(x))
        return np.array(outputs)


In [19]:
class BPNN:
    def __init__(self, input_size, hidden_size, output_size, lr=0.001, seed=42):
        np.random.seed(seed)

        # struktur
        self.NI = input_size
        self.NH = hidden_size
        self.NO = output_size
        self.lr = lr

        # inisialisasi bobot
        self.v = np.random.rand(self.NI, self.NH)   # input → hidden
        self.vb = np.random.rand(self.NH)           # bias hidden
        self.w = np.random.rand(self.NH, self.NO)   # hidden → output
        self.wb = np.random.rand(self.NO)           # bias output

    # --- Activation functions ---
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def sigmoid_derivative(self, x):
        s = self.sigmoid(x)
        return s * (1 - s)

    # --- Feedforward ---
    def feedforward(self, x):
        # hidden
        self.z_in = np.dot(x, self.v) + self.vb
        self.z = self.sigmoid(self.z_in)

        # output (pakai linear, bisa diganti sigmoid/tanh kalau mau)
        self.y_in = np.dot(self.z, self.w) + self.wb
        self.y = self.y_in  

        return self.y

    # --- Backpropagation ---
    def backpropagation(self, x, t):
        # error output
        delta_y = (t - self.y)
        del_w = self.lr * np.outer(self.z, delta_y)
        del_wb = self.lr * delta_y

        # error hidden
        delta_zin = np.dot(delta_y, self.w.T)
        delta_z = delta_zin * self.z * (1 - self.z)  # sigmoid derivative
        del_v = self.lr * np.outer(x, delta_z)
        del_vb = self.lr * delta_z

        # update bobot
        self.w += del_w
        self.wb += del_wb
        self.v += del_v
        self.vb += del_vb

        return np.mean(delta_y**2)   # return MSE

    # --- Training loop with validation ---
    def fit(self, X_train, T_train, X_val=None, T_val=None,
            epochs=1000, tol=0.01, patience=20):
        history = {"train_loss": [], "val_loss": []}
        best_val_loss = float("inf")
        patience_ctr = 0

        for epoch in range(epochs):
            mse = 0
            # training
            for x, t in zip(X_train, T_train):
                self.feedforward(x)
                mse += self.backpropagation(x, t)
            mse /= len(X_train)
            history["train_loss"].append(mse)

            # validation (no weight update)
            if X_val is not None and T_val is not None:
                val_loss = 0
                for x, t in zip(X_val, T_val):
                    y_pred = self.feedforward(x)
                    val_loss += np.mean((t - y_pred)**2)
                val_loss /= len(X_val)
                history["val_loss"].append(val_loss)

                # early stopping check
                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    patience_ctr = 0
                else:
                    patience_ctr += 1
                    if patience_ctr >= patience:
                        print(f"Early stopping at epoch {epoch}, val_loss={val_loss:.6f}")
                        break

            # tolerance check (on training loss)
            if mse < tol:
                print(f"Training stopped at epoch {epoch}, train_loss={mse:.6f}")
                break

        return history

    # --- Prediction ---
    def predict(self, X):
        outputs = []
        for x in X:
            outputs.append(self.feedforward(x))
        return np.array(outputs)


In [20]:
# dataset
X = df_lagged[["x(t)", "x(t-1)", "x(t-2)", "y(t-1)", "y(t-2)"]].values   # 100 sampel, input size=6
T = df_lagged[["y(t)"]].values    # target

# check size of dataset
print(X.shape, T.shape)

(30000, 5) (30000, 1)


In [None]:
def split_dataset(X, T, train=0.7, val=0.15, test=0.15, seed=None):
    """
    Membagi dataset menjadi train, validation, dan test set.

    Parameters
    ----------
    X : np.ndarray
        Fitur/input dataset.
    T : np.ndarray
        Target/label dataset.
    train, val, test : float
        Proporsi pembagian dataset (jumlahnya harus 1).
    seed : int or None
        Random seed untuk hasil konsisten.

    Returns
    -------
    (X_train, T_train, X_val, T_val, X_test, T_test)
    """
    assert abs(train + val + test - 1.0) < 1e-8, "Proporsi harus berjumlah 1"

    n_total = len(X)
    indices = np.arange(n_total)

    if seed is not None:
        np.random.seed(seed)
    np.random.shuffle(indices)

    n_train = int(train * n_total)
    n_val   = int(val * n_total)
    n_test  = n_total - n_train - n_val

    train_idx = indices[:n_train]
    val_idx   = indices[n_train:n_train+n_val]
    test_idx  = indices[n_train+n_val:]

    return (X[train_idx], T[train_idx],
            X[val_idx],  T[val_idx],
            X[test_idx], T[test_idx])


X_train, T_train, X_val, T_val, X_test, T_test = split_dataset(X, T, 0.7, 0.15, 0.15, seed=42)

print("Train:", X_train.shape, T_train.shape)
print("Val  :", X_val.shape, T_val.shape)
print("Test :", X_test.shape, T_test.shape)

In [21]:
def split_dataset_seq(X, T):
    """
    Membagi dataset 30.000 sample secara berurutan:
    - Train: 15.000 pertama
    - Val: 5.000 terakhir dari train + 5.000 setelah train
    - Test: 10.000 terakhir
    """
    assert len(X) == len(T), "Jumlah X dan T harus sama"
    assert len(X) == 30000, "Dataset harus 30.000 sample sesuai aturan"

    # Train 15.000 pertama
    X_train = X[:15000]
    T_train = T[:15000]

    # Validation = 5000 terakhir train + 5000 setelah train
    X_val = np.concatenate([X[10000:15000], X[15000:20000]])
    T_val = np.concatenate([T[10000:15000], T[15000:20000]])

    # Test = 10000 terakhir
    X_test = X[20000:30000]
    T_test = T[20000:30000]

    return X_train, T_train, X_val, T_val, X_test, T_test

X_train, T_train, X_val, T_val, X_test, T_test = split_dataset_seq(X, T)
print("Train:", X_train.shape, T_train.shape)
print("Val  :", X_val.shape, T_val.shape)
print("Test :", X_test.shape, T_test.shape)


Train: (15000, 5) (15000, 1)
Val  : (10000, 5) (10000, 1)
Test : (10000, 5) (10000, 1)


In [22]:
# buat model
model = BPNN(input_size=X_train.shape[1], hidden_size=128, output_size=1, lr=0.001)

# training
history = model.fit(X_train, T_train, X_val, T_val, epochs=1000, tol=1e-3, patience=20)

print("Last train loss:", history["train_loss"][-1])
print("Last val loss  :", history["val_loss"][-1])

  return 1 / (1 + np.exp(-x))


KeyboardInterrupt: 