In [18]:
import numpy as np
import cupy as cp
from sklearn.preprocessing import StandardScaler
import pandas as pd
import os

class ShallowNeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.learning_rate = learning_rate
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.epsilon = 1e-8
        self.t = 0
        
        self.W1, self.b1, self.W2, self.b2 = self.initialize_weights()
        self.initialize_adam_parameters()

    def initialize_weights(self):

        W1 = cp.random.randn(self.hidden_size, self.input_size) * 0.01
        W2 = cp.random.randn(self.output_size, self.input_size) * 0.01
        
        b1 = cp.zeros((self.hidden_size, 1))
        b2 = cp.zeros((self.output_size, 1))
        return W1, b1, W2, b2
    
    def initialize_adam_parameters(self):
        self.mW1, self.vW1 = cp.zeros_like(self.W1), cp.zeros_like(self.W1)
        self.mb1, self.vb1 = cp.zeros_like(self.b1), cp.zeros_like(self.b1)
        self.mW2, self.vW2 = cp.zeros_like(self.W2), cp.zeros_like(self.W2)
        self.mb2, self.vb2 = cp.zeros_like(self.b2), cp.zeros_like(self.b2)
    
    def forward(self, x):
        self.z1 = cp.dot(self.W1, x) + self.b1
        self.a1 = cp.tanh(self.z1)
        self.z2 = cp.dot(self.W2, self.a1) + self.b2
        self.output = cp.tanh(self.z2)
        return self.output
    
    def predict(self, X):
        self.forward(X)
        return self.output
    
    def backward(self, x, y):
        m = x.shape[1]
        dz2 = self.output - y
        dW2 = (1/m) * cp.dot(dz2, self.a1.T)
        db2 = (1/m) * cp.sum(dz2, axis=1, keepdims=True)
        dz1 = cp.dot(self.W2.T, dz2) * (1 - cp.power(self.a1, 2))
        dW1 = (1/m) * cp.dot(dz1, x.T)
        db1 = (1/m) * cp.sum(dz1, axis=1, keepdims=True)
        self.update_weights(dW1, db1, dW2, db2)

    def update_weights(self, dW1, db1, dW2, db2):
        self.t += 1
        def adam_update(m, v, grad, beta1, beta2, epsilon, t):
            m = beta1 * m + (1 - beta1) * grad
            v = beta2 * v + (1 - beta2) * cp.power(grad, 2)
            m_hat = m / (1 - cp.power(beta1, t))
            v_hat = v / (1 - cp.power(beta2, t))
            return m, v, m_hat / (cp.sqrt(v_hat) + epsilon)
        
        self.mW1, self.vW1, mW1_hat = adam_update(self.mW1, self.vW1, dW1, self.beta1, self.beta2, self.epsilon, self.t)
        self.mb1, self.vb1, mb1_hat = adam_update(self.mb1, self.vb1, db1, self.beta1, self.beta2, self.epsilon, self.t)
        self.mW2, self.vW2, mW2_hat = adam_update(self.mW2, self.vW2, dW2, self.beta1, self.beta2, self.epsilon, self.t)
        self.mb2, self.vb2, mb2_hat = adam_update(self.mb2, self.vb2, db2, self.beta1, self.beta2, self.epsilon, self.t)

        self.W1 -= self.learning_rate * mW1_hat
        self.b1 -= self.learning_rate * mb1_hat
        self.W2 -= self.learning_rate * mW2_hat
        self.b2 -= self.learning_rate * mb2_hat
            
    def train(self, X, y, X_val, y_val, epochs=1, batch_size=1024, patience=10):
        losses, val_losses = [], []
        best_val_loss, best_weights = float('inf'), None
        epochs_no_improve, stopped_epoch = 0, 0
        num_batches = (X.shape[1] + batch_size - 1) // batch_size
        total_steps = epochs * num_batches

        with tqdm(total=total_steps, desc="Training Progress") as bar:
            for epoch in range(epochs):
                for i in range(0, X.shape[1], batch_size):
                    x_batch = X[:, i:i + batch_size]
                    y_batch = y[:, i:i + batch_size]
                    self.forward(x_batch)
                    self.backward(x_batch, y_batch)
                    bar.update(1)

                y_pred_train = self.predict(X)
                loss = mean_squared_error(y.get(), y_pred_train.get())
                losses.append(loss)

                y_pred_val = self.predict(X_val)
                val_loss = mean_squared_error(y_val.get(), y_pred_val.get())
                val_losses.append(val_loss)

                if val_loss < best_val_loss:
                    best_val_loss, best_weights = val_loss, (self.W1.copy(), self.b1.copy(), self.W2.copy(), self.b2.copy())
                    epochs_no_improve = 0
                else:
                    epochs_no_improve += 1
                
                if epochs_no_improve >= patience:
                    stopped_epoch = epoch
                    break

            if best_weights is not None:
                self.W1, self.b1, self.W2, self.b2 = best_weights
            
            return losses, val_losses, stopped_epoch
    def __str__(self):
        return f'\nW1={self.W1}\nb1={self.b1}\nW2={self.W2}\nb2={self.b2}'
    
    def save(self, file_path):
        np.savez(file_path, 
                    W1=self.W1.get(), 
                    b1=self.b1.get(), 
                    W2=self.W2.get(), 
                    b2=self.b2.get())
        print(f"Model saved to {file_path}")

    def load(self, file_path):
        data = np.load(file_path)
        self.W1 = cp.array(data['W1'])
        self.b1 = cp.array(data['b1'])
        self.W2 = cp.array(data['W2'])
        self.b2 = cp.array(data['b2'])
        print(f"Model loaded from {file_path}")
def generate_data(num_samples=10000):
    # np.random.seed(42)
    # 生成隨機數據，分別從兩個區間選擇
    X = np.zeros((num_samples, 2))
    for i in range(num_samples):
        for j in range(2):
            X[i, j] = np.random.choice([np.random.uniform(-0.5, 0.2), np.random.uniform(0.8, 1.5)])
    # 將數值轉換為0或1
    X_binarized = (X > 0.5).astype(int)
    # 計算 XOR
    y = np.bitwise_xor(X_binarized[:, 0], X_binarized[:, 1]).reshape(-1, 1)
    
    return X, y

def check_and_save_data(csv_path, num_samples):
    if os.path.exists(csv_path):
        print(f"Loading existing dataset from {csv_path}")
        data = pd.read_csv(csv_path)
        X = data[['x1', 'x2']].values
        y = data['y'].values.reshape(-1, 1)
    else:
        print(f"No existing dataset found. Generating new data...")
        X, y = generate_data(num_samples)
        pd.DataFrame({'x1': X[:, 0], 'x2': X[:, 1], 'y': y.flatten()}).to_csv(csv_path, index=False)
        print(f"Dataset saved to {csv_path}")
    return X, y

class Config:
    EPOCHS = 10
    BATCH_SIZE = pow(2, 4)
    NUM_SAMPLES = pow(2, 14)
    LEARNING_RATE = 0.001
    MIN_RANGE = 2
    MAX_RANGE = 2
    HIDDEN_SIZES = range(MIN_RANGE, MAX_RANGE + 1)
    ROUNDS = 5
    PATIENCE = int(EPOCHS*0.5)
    def __str__(self):
        return (
            f"Config:\n"
            f"  LEARNING_RATE={self.LEARNING_RATE}\n"
            f"  EPOCHS={self.EPOCHS}\n"
            f"  BATCH_SIZE={self.BATCH_SIZE}\n"
            f"  NUM_SAMPLES={self.NUM_SAMPLES}\n"
            f"  MIN_RANGE={self.MIN_RANGE}\n"
            f"  MAX_RANGE={self.MAX_RANGE}\n"
            f"  HIDDEN_SIZES={list(self.HIDDEN_SIZES)}\n"
            f"  ROUNDS={self.ROUNDS}\n"
            f"  PATIENCE={self.PATIENCE}\n"
            )

config = Config()
X, y = check_and_save_data('dataset.csv', config.NUM_SAMPLES)
train_size = int(0.8 * X.shape[0])
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

scaler_X_0, scaler_X_1 = StandardScaler(), StandardScaler()
X_train_0 = scaler_X_0.fit_transform(X_train[:, 0].reshape(-1, 1))
X_train_1 = scaler_X_1.fit_transform(X_train[:, 1].reshape(-1, 1))
X_train = np.hstack((X_train_0, X_train_1))

X_test_0 = scaler_X_0.transform(X_test[:, 0].reshape(-1, 1))
X_test_1 = scaler_X_1.transform(X_test[:, 1].reshape(-1, 1))
X_test = np.hstack((X_test_0, X_test_1))

scaler_y = StandardScaler()
y_train = y_train.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

# 保存標準化器

np.save('scaler_x0_mean.npy', scaler_X_0.mean_)
np.save('scaler_x0_scale.npy', scaler_X_0.scale_)
np.save('scaler_x1_mean.npy', scaler_X_1.mean_)
np.save('scaler_x1_scale.npy', scaler_X_1.scale_)

X_train_T = X_train.T
y_train_T = y_train.reshape(1, -1)

X_train = cp.array(X_train.T, dtype=cp.float32)
X_test = cp.array(X_test.T, dtype=cp.float32)
y_train = cp.array(y_train.T, dtype=cp.float32)
y_test = cp.array(y_test.T, dtype=cp.float32)

nn = ShallowNeuralNetwork(input_size=2, hidden_size=10, output_size=1, learning_rate=0.01)
nn.load('best_val_model.npz')

def calculate(input_x1, input_x2):
    scaler_X_0 = StandardScaler()
    scaler_X_1 = StandardScaler()
    scaler_X_0.mean_ = np.load('scaler_x0_mean.npy')
    scaler_X_0.scale_ = np.load('scaler_x0_scale.npy')
    scaler_X_1.mean_ = np.load('scaler_x1_mean.npy')
    scaler_X_1.scale_ = np.load('scaler_x1_scale.npy')

    input_x1_scaled = scaler_X_0.transform(np.array([[input_x1]]))
    input_x2_scaled = scaler_X_1.transform(np.array([[input_x2]]))
    inputs = cp.array([[input_x1_scaled], [input_x2_scaled]], dtype=cp.float32).reshape(2, 1)

    nn.forward(inputs)
    output = nn.output.get()
    
    return output

Loading existing dataset from dataset.csv
Model loaded from best_val_model.npz


In [19]:
def print_result(x1, x2):
    result = calculate(x1, x2)
    
    ideal = (1 if x1 > 0.5 else 0) ^ (1 if x2 > 0.5 else 0)
    print(f'{x1:.2f}\txor\t{x2:.2f}\t= {result.item():.6f} ~ {int(np.floor(result.item()/0.75))}\t(ideal: {ideal})')

# numbers = [(-0.5, -0.5), 
#            (0.2, 0.2), 
#            (0.8, 0.8), 
#            (1.5, 1.5), 
#            (-0.3, 1.2), 
#            (-0.41457323560865483,1.0737424253012686),
#            (-0.29388577555129003,1.139695911993421), 
#            (1.036596609796075,-0.0947945731447089),
#            (0.16133601091769956,-0.3312406173692577),
#            (-0.3786384832936716,-0.463855165487841),
#            (1.2340509291499595,0.01902263779733182),
#            (1.042442196056173,1.4429183074575107)
#            ]
numbers, y = generate_data(20) 
for x1, x2 in numbers:
    print_result(x1, x2)

1.01	xor	1.38	= 0.002226 ~ 0	(ideal: 0)
0.09	xor	1.24	= 0.756392 ~ 1	(ideal: 1)
1.17	xor	0.93	= 0.002040 ~ 0	(ideal: 0)
1.46	xor	1.05	= 0.047598 ~ 0	(ideal: 0)
1.40	xor	-0.02	= 0.759916 ~ 1	(ideal: 1)
0.84	xor	0.99	= 0.002040 ~ 0	(ideal: 0)
-0.16	xor	1.38	= 0.756392 ~ 1	(ideal: 1)
-0.09	xor	-0.07	= 0.002040 ~ 0	(ideal: 0)
0.00	xor	0.85	= 0.756392 ~ 1	(ideal: 1)
-0.36	xor	-0.14	= 0.002040 ~ 0	(ideal: 0)
0.10	xor	1.48	= 0.756392 ~ 1	(ideal: 1)
-0.39	xor	1.03	= 0.756392 ~ 1	(ideal: 1)
-0.49	xor	0.92	= 0.756392 ~ 1	(ideal: 1)
0.10	xor	1.39	= 0.756392 ~ 1	(ideal: 1)
0.02	xor	1.33	= 0.756392 ~ 1	(ideal: 1)
-0.27	xor	-0.05	= 0.002040 ~ 0	(ideal: 0)
1.44	xor	-0.20	= 0.759916 ~ 1	(ideal: 1)
1.25	xor	1.27	= 0.002040 ~ 0	(ideal: 0)
1.40	xor	0.90	= 0.756013 ~ 1	(ideal: 0)
-0.40	xor	0.08	= 0.341424 ~ 0	(ideal: 0)
