In [1]:
import numpy as np
import cupy as cp
from sklearn.preprocessing import StandardScaler
import pandas as pd
import os

class ShallowNeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01, init_method='xavier'):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.learning_rate = learning_rate
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.epsilon = 1e-8
        self.t = 0

        self.W1, self.b1, self.W2, self.b2 = self.initialize_weights(init_method)
        self.initialize_adam_parameters()

    def initialize_adam_parameters(self):
        self.mW1, self.vW1 = cp.zeros_like(self.W1), cp.zeros_like(self.W1)
        self.mb1, self.vb1 = cp.zeros_like(self.b1), cp.zeros_like(self.b1)
        self.mW2, self.vW2 = cp.zeros_like(self.W2), cp.zeros_like(self.W2)
        self.mb2, self.vb2 = cp.zeros_like(self.b2), cp.zeros_like(self.b2)

    def initialize_weights(self, method):
        if method == 'xavier':
            W1 = cp.random.randn(self.hidden_size, self.input_size) * cp.sqrt(1 / self.input_size)
            W2 = cp.random.randn(self.output_size, self.hidden_size) * cp.sqrt(1 / self.hidden_size)
        elif method == 'he':
            W1 = cp.random.randn(self.hidden_size, self.input_size) * cp.sqrt(2 / self.input_size)
            W2 = cp.random.randn(self.output_size, self.hidden_size) * cp.sqrt(2 / self.hidden_size)
        else:
            raise ValueError("init_method must be either 'xavier' or 'he'")

        b1 = cp.zeros((self.hidden_size, 1))
        b2 = cp.zeros((self.output_size, 1))
        return W1, b1, W2, b2

    def forward(self, x):
        self.z1 = cp.dot(self.W1, x) + self.b1
        self.a1 = cp.tanh(self.z1)
        self.z2 = cp.dot(self.W2, self.a1) + self.b2
        self.output = self.z2
        return self.output

    def predict(self, X):
        self.forward(X)
        return self.output

    def load(self, file_path):
        data = np.load(file_path)
        self.W1 = cp.array(data['W1'])
        self.b1 = cp.array(data['b1'])
        self.W2 = cp.array(data['W2'])
        self.b2 = cp.array(data['b2'])
        print(f"Model loaded from {file_path}")
def generate_data(num_samples=10000):
    np.random.seed(42)
    X = np.random.randint(1000, 10000, (num_samples, 2))
    y = np.sum(X, axis=1).reshape(-1, 1)
    return X, y

def check_and_save_data(csv_path, num_samples):
    if os.path.exists(csv_path):
        print(f"Loading existing dataset from {csv_path}")
        data = pd.read_csv(csv_path)
        X = data[['x1', 'x2']].values
        y = data['y'].values.reshape(-1, 1)
    else:
        print(f"No existing dataset found. Generating new data...")
        X, y = generate_data(num_samples)
        pd.DataFrame({'x1': X[:, 0], 'x2': X[:, 1], 'y': y.flatten()}).to_csv(csv_path, index=False)
        print(f"Dataset saved to {csv_path}")
    return X, y

class Config:
    EPOCHS = 100
    BATCH_SIZE = pow(2, 4)
    NUM_SAMPLES = pow(2, 14)
    LEARNING_RATE = 0.001
    MIN_RANGE = 1
    MAX_RANGE = 20
    HIDDEN_SIZES = range(MIN_RANGE, MAX_RANGE + 1)
    ROUNDS = 30
    PATIENCE = int(EPOCHS*0.1)
    def __str__(self):
        return (
            f"Config:\n"
            f"  LEARNING_RATE={self.LEARNING_RATE}\n"
            f"  EPOCHS={self.EPOCHS}\n"
            f"  BATCH_SIZE={self.BATCH_SIZE}\n"
            f"  NUM_SAMPLES={self.NUM_SAMPLES}\n"
            f"  MIN_RANGE={self.MIN_RANGE}\n"
            f"  MAX_RANGE={self.MAX_RANGE}\n"
            f"  HIDDEN_SIZES={list(self.HIDDEN_SIZES)}\n"
            f"  ROUNDS={self.ROUNDS}\n"
            f"  PATIENCE={self.PATIENCE}\n"
            )

config = Config()
X, y = check_and_save_data('dataset.csv', config.NUM_SAMPLES)
train_size = int(0.8 * X.shape[0])
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

scaler_X_0, scaler_X_1 = StandardScaler(), StandardScaler()
X_train_0 = scaler_X_0.fit_transform(X_train[:, 0].reshape(-1, 1))
X_train_1 = scaler_X_1.fit_transform(X_train[:, 1].reshape(-1, 1))
X_train = np.hstack((X_train_0, X_train_1))

X_test_0 = scaler_X_0.transform(X_test[:, 0].reshape(-1, 1))
X_test_1 = scaler_X_1.transform(X_test[:, 1].reshape(-1, 1))
X_test = np.hstack((X_test_0, X_test_1))

scaler_y = StandardScaler()
y_train = scaler_y.fit_transform(y_train.reshape(-1, 1))
y_test = scaler_y.transform(y_test.reshape(-1, 1))

# 保存標準化器

np.save('scaler_x0_mean.npy', scaler_X_0.mean_)
np.save('scaler_x0_scale.npy', scaler_X_0.scale_)
np.save('scaler_x1_mean.npy', scaler_X_1.mean_)
np.save('scaler_x1_scale.npy', scaler_X_1.scale_)

np.save('scaler_y_mean.npy', scaler_y.mean_)
np.save('scaler_y_scale.npy', scaler_y.scale_)

X_train_T = X_train.T
y_train_T = y_train.reshape(1, -1)

X_train = cp.array(X_train.T, dtype=cp.float32)
X_test = cp.array(X_test.T, dtype=cp.float32)
y_train = cp.array(y_train.T, dtype=cp.float32)
y_test = cp.array(y_test.T, dtype=cp.float32)

nn = ShallowNeuralNetwork(input_size=2, hidden_size=10, output_size=1, learning_rate=0.01, init_method='xavier')
nn.load('best_val_model.npz')

def calculate(input_x1, input_x2):
    scaler_X_0 = StandardScaler()
    scaler_X_1 = StandardScaler()
    scaler_X_0.mean_ = np.load('scaler_x0_mean.npy')
    scaler_X_0.scale_ = np.load('scaler_x0_scale.npy')
    scaler_X_1.mean_ = np.load('scaler_x1_mean.npy')
    scaler_X_1.scale_ = np.load('scaler_x1_scale.npy')

    input_x1_scaled = scaler_X_0.transform(np.array([[input_x1]]))
    input_x2_scaled = scaler_X_1.transform(np.array([[input_x2]]))
    inputs = cp.array([[input_x1_scaled], [input_x2_scaled]], dtype=cp.float32).reshape(2, 1)

    nn.forward(inputs)
    output = nn.output.get()
    
    # 加載標準化器
    scaler_y = StandardScaler()
    scaler_y.mean_ = np.load('scaler_y_mean.npy')
    scaler_y.scale_ = np.load('scaler_y_scale.npy')
    
    output_unscaled = scaler_y.inverse_transform(output)
    return output_unscaled

Loading existing dataset from dataset.csv
Model loaded from best_val_model.npz


In [3]:
def print_sum(x1, x2):
    result = calculate(x1, x2)
    print(f'{x1} + {x2} = {result.item():.2f}\t(ideal: {x1+x2})')

numbers = [(9635, 4382), (3214, 4653), (7235, 1472), (1000, 1000), (9999, 9999)]

for x1, x2 in numbers:
    print_sum(x1, x2)

9635 + 4382 = 14028.52	(ideal: 14017)
3214 + 4653 = 7867.20	(ideal: 7867)
7235 + 1472 = 8710.01	(ideal: 8707)
1000 + 1000 = 2108.21	(ideal: 2000)
9999 + 9999 = 19874.37	(ideal: 19998)
