Sieć perceptronowa - nasz

In [14]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler

np.random.seed(42)
data = pd.read_csv('XAU_1d_data.csv', delimiter=";")

data['Date'] = pd.to_datetime(data['Date']).dt.date

data = data.dropna()

features = ['Open', 'High', 'Low', 'Volume']
target = 'Close'


for col in features + [target]:
    mean = data[col].mean()
    std = data[col].std()
    data = data[(data[col] >= mean - 3*std) & (data[col] <= mean + 3*std)]

X = data[features].values
y = data[target].values.reshape(-1, 1)

combined = np.concatenate((X, y), axis=1)

def split_time_series(data, train_ratio=0.6, val_ratio=0.2):
    n = len(data)
    train_end = int(n * train_ratio)
    val_end = int(n * (train_ratio + val_ratio))

    train_data = data[:train_end]
    validation_data = data[train_end:val_end]
    test_data = data[val_end:]

    return train_data, validation_data, test_data

train_data, validation_data, test_data = split_time_series(combined)

X_train = train_data[:, :len(features)]
y_train = train_data[:, len(features):]

X_validation = validation_data[:, :len(features)]
y_validation = validation_data[:, len(features):]

X_test = test_data[:, :len(features)]
y_test = test_data[:, len(features):]

scaler = StandardScaler()

X_train = scaler.fit_transform(X_train)
X_validation = scaler.transform(X_validation)
X_test = scaler.transform(X_test)

scaler_y = StandardScaler()
y_train = scaler_y.fit_transform(y_train)
y_validation = scaler_y.transform(y_validation)
y_test = scaler_y.transform(y_test)

print("\nRozmiary zbiorów:")
print(f"Train: X={X_train.shape}, y={y_train.shape}")
print(f"Validation: X={X_validation.shape}, y={y_validation.shape}")
print(f"Test: X={X_test.shape}, y={y_test.shape}")



Rozmiary zbiorów:
Train: X=(3145, 4), y=(3145, 1)
Validation: X=(1049, 4), y=(1049, 1)
Test: X=(1049, 4), y=(1049, 1)


In [15]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import LabelEncoder, StandardScaler

def relu(x, derivative=False):
    if derivative:
        return (x > 0).astype(float)
    return np.maximum(0, x)

# Inicjalizacja wag sieci dla wielu warstw ukrytych
# def initialize_weights(input_size, hidden_layers_sizes, output_size):
#     weights = []
#     layer_sizes = [input_size] + hidden_layers_sizes + [output_size]
#     for i in range(len(layer_sizes) - 1):
#         weights.append(2 * np.random.random((layer_sizes[i], layer_sizes[i+1])) - 1)
#     return weights

def initialize_weights(input_size, hidden_layers_sizes, output_size):
    weights = []
    layer_sizes = [input_size] + hidden_layers_sizes + [output_size]
    for i in range(len(layer_sizes) - 1):
        weights.append(np.random.randn(layer_sizes[i], layer_sizes[i+1]) * np.sqrt(2 / layer_sizes[i]))
    return weights

# Podział danych na zbiory treningowe, generalizacyjne i walidacyjne
def split_data(data, train_ratio=0.6, validation_ratio=0.2):
    np.random.shuffle(data) # tasowanie danych
    
    train_size = int(len(data) * train_ratio) 
    validation_size = int(len(data) * validation_ratio)

    train_data = data[:train_size] # wybiera obserwacje do liczby "train_size"
    validation_data = data[train_size:train_size + validation_size] # wybiera obserwacje od "train_size" do sumy "train_size" i "validation_size"
    test_data = data[train_size + validation_size:] # wybiera obserwacje od powyzszej sumy do końca

    return train_data, validation_data, test_data

# Funkcja dostosowująca tempa nauki
def adjust_learning_rate(learning_rate, mse, previous_mse, learning_rate_adjust, threshold=1e-6):
    if mse < previous_mse:
        learning_rate *= 1.05
    else:
        learning_rate *= 0.7

    if abs(mse - previous_mse) < threshold:
        learning_rate *= learning_rate_adjust

    return learning_rate

# Trening sieci z wieloma warstwami ukrytymi
def train(X, y, learning_rate, learning_rate_adjust, epochs,
          hidden_layers_sizes, optimizer, momentum):

    input_size = X.shape[1]
    output_size = y.shape[1]
    weights = initialize_weights(input_size, hidden_layers_sizes, output_size)
    velocities = [np.zeros_like(w) for w in weights]
    prev_loss = np.inf

    for epoch in range(epochs):
        # Forward pass
        activations = [X]
        zs = []
        for i, w in enumerate(weights):
            z = np.dot(activations[-1], w)
            if i == len(weights) - 1:
                activations.append(z)   # brak softmax
            else:
                activations.append(relu(z))
        predicted_output = activations[-1]

        # Backpropagation
        error = predicted_output - y
        loss = np.mean(error ** 2)

        learning_rate = adjust_learning_rate(
            learning_rate,
            loss,
            prev_loss,
            learning_rate_adjust
        )
        prev_loss = loss

        deltas = [error] 
        for i in range(len(weights) - 1, 0, -1):
            delta = deltas[-1].dot(weights[i].T) * relu(activations[i], derivative=True)
            deltas.append(delta)

        deltas.reverse() 

        # Aktualizacja wag
        for i in range(len(weights)):
            grad = activations[i].T.dot(deltas[i]) / X.shape[0]

            if optimizer == 'gd':
                weights[i] -= learning_rate * grad

            elif optimizer == 'momentum':
                velocities[i] = momentum * velocities[i] - learning_rate * grad
                weights[i] += velocities[i]

    return weights


def predict(X, weights):
    output = X
    for i, w in enumerate(weights):
        output = np.dot(output, w)
        if i < len(weights) - 1:
            output = relu(output)
        else:
            output = output  # liniowe wyjście
    return output

def mae_np(y_true, y_pred):
    return np.mean(np.abs(y_true - y_pred))

# Parametry sieci
learning_rates = [0.01]
learning_rate_adjusts = [0.0005]
epochses = [1000]
repeat = 3
optimizers = ['gd', 'momentum']
momentums = [0.0, 0.9]
# gd → zwykły gradient prosty
# momentum → gradient z momentem

# Warstwy
hidden_layers_sizes_list = [
    [10],         
    [10, 10]       
]

def calculate_regression_metrics(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return mse, mae, r2

# Przechowywanie wyników dla różnych konfiguracji warstw
results = []

# Testowanie
for hidden_layers_sizes in hidden_layers_sizes_list:  
    for r in range(1, repeat + 1):
        for lr in learning_rates:
            for lr_adj in learning_rate_adjusts:
                for epochs in epochses:
                    for optimizer in optimizers:
                        for momentum in momentums:
                            trained_weights = train(
                                X_train, y_train, lr, lr_adj, epochs, hidden_layers_sizes, optimizer=optimizer, momentum=momentum
                            )
                            predictions_train = predict(X_train, trained_weights)
                            predictions_validation = predict(X_validation, trained_weights)
                            predictions_test = predict(X_test, trained_weights)

                            # Odwracanie skalowania y
                            y_pred_test_real = scaler_y.inverse_transform(predictions_test)
                            y_test_real = scaler_y.inverse_transform(y_test)

                            # Obliczamy metryki
                            mse, mae, r2 = calculate_regression_metrics(y_test, predictions_test)

                            # Dodanie wyników do tabeli
                            results.append({
                                'hidden_layers': str(hidden_layers_sizes),
                                'optimizer': optimizer,
                                'momentum': momentum,
                                'learning_rate': lr,
                                'learning_rate_adjust': lr_adj,
                                'epochs': epochs,
                                'repeat': r,
                                'mse': mse,
                                'mae': mae,
                                'r2': r2
                            })

# Tworzenie DataFrame z wynikami
results_df = pd.DataFrame(results)

# Wyświetlanie wyników
print(results_df)
print(results_df.head())

summary = results_df.groupby(
    ['hidden_layers', 'learning_rate', 'optimizer', 'momentum']
).agg(
    avg_mse=('mse', 'mean'),
    avg_mae=('mae', 'mean'),
    avg_r2=('r2', 'mean'),
    best_mse=('mse', 'min'),
    best_r2=('r2', 'max')
).reset_index()

print(summary)


   hidden_layers optimizer  momentum  learning_rate  learning_rate_adjust  \
0           [10]        gd       0.0           0.01                0.0005   
1           [10]        gd       0.9           0.01                0.0005   
2           [10]  momentum       0.0           0.01                0.0005   
3           [10]  momentum       0.9           0.01                0.0005   
4           [10]        gd       0.0           0.01                0.0005   
5           [10]        gd       0.9           0.01                0.0005   
6           [10]  momentum       0.0           0.01                0.0005   
7           [10]  momentum       0.9           0.01                0.0005   
8           [10]        gd       0.0           0.01                0.0005   
9           [10]        gd       0.9           0.01                0.0005   
10          [10]  momentum       0.0           0.01                0.0005   
11          [10]  momentum       0.9           0.01                0.0005   