In [None]:
# ==========================================================
# Maestría en Ciencia y Análisis de Datos
# Universidad Mayor de San Andrés
# ----------------------------------------------------------
#            Machine Learning y Deep Learning
# ----------------------------------------------------------
#         Rolando Gonzales Martinez, Agosto 2024
# ==========================================================
#      Redes neuronales (perceptron) multicapa con
#     una capa oculta, para la prediccion de BitCoin
# ==========================================================
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.model_selection import train_test_split

In [None]:
# ---------------------- Cargando datos ----------------------------------------
# Cargar el conjunto de datos:
url =  "https://raw.githubusercontent.com/rogon666/UMSA/main/AIMLDL/Datos/BTCUSD.csv"
# Cargar los datos en un DataFrame
data= pd.read_csv(url)
# Mostrar las primeras filas del DataFrame
print(data.head())

In [None]:
# Función de activación Sigmoide
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

# Derivada de la función Sigmoide
def sigmoid_derivative(z):
    return z * (1 - z)

# Función de pérdida de error cuadrático medio
def mse_loss(y_true, y_pred):
    return np.mean((y_true - y_pred) ** 2)

# MLP simple con una capa oculta
class MLP:
    def __init__(self, input_size, hidden_size, output_size):
        # Inicialización de pesos aleatorios
        self.weights_input_hidden = np.random.randn(input_size, hidden_size)
        self.weights_hidden_output = np.random.randn(hidden_size, output_size)
        self.bias_hidden = np.random.randn(hidden_size)
        self.bias_output = np.random.randn(output_size)

    def forward(self, X):
        # Paso 1: Propagación hacia adelante
        self.z1 = np.dot(X, self.weights_input_hidden) + self.bias_hidden
        self.a1 = sigmoid(self.z1)
        self.z2 = np.dot(self.a1, self.weights_hidden_output) + self.bias_output
        self.a2 = sigmoid(self.z2)
        return self.a2

    def backward(self, X, y, output, learning_rate):
        # Paso 2: Retropropagación
        # Error en la capa de salida
        output_error = y - output
        output_delta = output_error * sigmoid_derivative(output)

        # Error en la capa oculta
        hidden_error = np.dot(output_delta, self.weights_hidden_output.T)
        hidden_delta = hidden_error * sigmoid_derivative(self.a1)

        # Actualización de pesos y sesgos
        self.weights_hidden_output += np.dot(self.a1.T, output_delta) * learning_rate
        self.bias_output += np.sum(output_delta, axis=0) * learning_rate
        self.weights_input_hidden += np.dot(X.T, hidden_delta) * learning_rate
        self.bias_hidden += np.sum(hidden_delta, axis=0) * learning_rate

    def train(self, X, y, epochs, learning_rate):
        loss_history = []
        for epoch in range(epochs):
            output = self.forward(X)
            self.backward(X, y, output, learning_rate)
            loss = mse_loss(y, output)
            loss_history.append(loss)
            if epoch % 100 == 0:
                print(f"Epoch {epoch}, Loss: {loss}")
        return loss_history

In [None]:
# Preparar el objetivo (y)
y = data['Close'].values.reshape(-1, 1)

# Crear la matriz de características X como rezagos de y
def create_lagged_features(y, lag=1):
    X = np.zeros((len(y) - lag, lag))
    for i in range(lag):
        X[:, i] = y[(lag - i - 1):(len(y) - i - 1), 0]
    y_lagged = y[lag:]
    return X, y_lagged

# Usar rezagos para las características
lag = 60
X, y_lagged = create_lagged_features(y, lag)

# Normalizar los datos
X = (X - X.mean(axis=0)) / X.std(axis=0)
y_lagged = (y_lagged - y_lagged.mean()) / y_lagged.std()

# Partición de los datos
X_train, X_test, y_train, y_test = train_test_split(X, y_lagged, test_size=0.15, shuffle=False)

# Inicialización del modelo
input_size = X_train.shape[1]
hidden_size = 1  # Puede ajustarse
output_size = 1
mlp = MLP(input_size, hidden_size, output_size)

# Entrenamiento del modelo
epochs = 100
learning_rate = 0.1
loss_history = mlp.train(X_train, y_train, epochs, learning_rate)

# Predicción en el conjunto de prueba
y_pred_train = mlp.forward(X_train)
y_pred_test = mlp.forward(X_test)

# Desnormalizar para evaluar correctamente
y_train_actual = y_train * y_lagged.std() + y_lagged.mean()
y_test_actual = y_test * y_lagged.std() + y_lagged.mean()
y_pred_train_actual = y_pred_train * y_lagged.std() + y_lagged.mean()
y_pred_test_actual = y_pred_test * y_lagged.std() + y_lagged.mean()

# Evaluación del modelo
mae_train = mean_absolute_error(y_train_actual, y_pred_train_actual)
rmse_train = np.sqrt(mean_squared_error(y_train_actual, y_pred_train_actual))
mae_test = mean_absolute_error(y_test_actual, y_pred_test_actual)
rmse_test = np.sqrt(mean_squared_error(y_test_actual, y_pred_test_actual))

print(f"MAE (train): {mae_train}")
print(f"RMSE (train): {rmse_train}")
print(f"MAE (test): {mae_test}")
print(f"RMSE (test): {rmse_test}")

# Gráficos
plt.figure(figsize=(14, 7))

# Pérdida durante el entrenamiento
plt.subplot(2, 1, 1)
plt.plot(loss_history, label='Loss')
plt.title('Evolución de la pérdida durante el entrenamiento')
plt.xlabel('Epocas (Epochs)')
plt.ylabel('Funcion de perdida MSE')
plt.legend()

# Predicciones vs Realidad
plt.subplot(2, 1, 2)
plt.plot(y_test_actual, label='Valores observados')
plt.plot(y_pred_test_actual, label='Valores predichos')
plt.title('Comparación entre los precios reales y predichos')
plt.xlabel('Días')
plt.ylabel('Precio Bitcoin')
plt.legend()

plt.tight_layout()
plt.show()
