In [16]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [128]:
df = pd.read_csv("https://raw.githubusercontent.com/lihkir/Data/refs/heads/main/Bitcoin%20Historical%20Data.csv")
df["Date"] = pd.to_datetime(df["Date"])
df = df.set_index("Date")
df.sort_index(inplace=True)

In [129]:
timeserie  = df["Price"]

In [166]:
def split_time_series(data, tau, train_size=7, jump=0):
    """
    train_size:
    tau:
    """
    n_samples = len(data)
    
    dim_train_size = 14
    dim_tau_size = 7
    total_dim = dim_train_size + dim_tau_size * 2

    splits = []
    id_counter = 1
    dim = 1
    split_counter = 0

    for start in range(0, n_samples - train_size - tau*2 + jump, 1):
        if split_counter == 0:
            if start + total_dim > n_samples - train_size - tau * 2 + jump:
                break  # No hay suficientes datos

        x_train_end = start + train_size + jump
        y_tain = x_train_end + tau

        if split_counter < dim_train_size:
            split_type  = "train"
        elif split_counter < dim_train_size + dim_tau_size:
            split_type  = "val"
        elif split_counter < dim_train_size + dim_tau_size*2:
            split_type  = "test"
        else:
            # Reset
            dim += 1
            split_counter = 0
            split_type = "train"
        
        split_dict = {
            "id": id_counter,
            "dim": dim,
            "split": split_type,
            "X": [data.iloc[start:x_train_end]],  
            "y": [data.iloc[x_train_end:y_tain]]
        }

        splits.append(split_dict)

        # Incrementar los contadores
        id_counter += 1
        split_counter += 1

        if split_counter >= total_dim:
            dim += 1
            split_counter = 0

    return splits

array = split_time_series(data=timeserie, tau=1)
df_model = pd.DataFrame(array)

In [169]:
df_model.head()

Unnamed: 0,id,dim,split,X,y
0,1,1,train,"[[0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]]",[[0.1]]
1,2,1,train,"[[0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]]",[[0.1]]
2,3,1,train,"[[0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]]",[[0.1]]
3,4,1,train,"[[0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]]",[[0.1]]
4,5,1,train,"[[0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]]",[[0.1]]


In [143]:

def plot_splits(data, splits, horizon, dim=1):
    """
    Visualiza los pliegues de entrenamiento, validación y prueba para una dimensión específica.

    Parámetros:
    - data: pandas DataFrame o Series, la serie temporal completa.
    - splits: lista de diccionarios, cada diccionario contiene información de un split.
    - horizon: int, horizonte de predicción τ.
    - dim: int, la dimensión a visualizar.
    """
    # Filtrar los splits para la dimensión especificada
    dim_splits = [s for s in splits if s['dim'] == dim]

    if not dim_splits:
        print(f"No hay splits para la dimensión {dim}.")
        return

    # Inicializar listas para almacenar los rangos de cada tipo de split
    train_ranges = []
    val_ranges = []
    test_ranges = []

    for split in dim_splits:
        split_type = split['split']
        X = split['X'][0]
        y = split['y'][0]
        
        # Obtener los índices de inicio y fin
        start = X.index[0]
        end = y.index[-1]
        
        if split_type == 'train':
            train_ranges.append((start, end))
        elif split_type == 'val':
            val_ranges.append((start, end))
        elif split_type == 'test':
            test_ranges.append((start, end))

    # Configurar el gráfico
    plt.figure(figsize=(15, 5))
    plt.plot(data.index, data, label='Datos', color='black')

    # Función auxiliar para evitar múltiples etiquetas en la leyenda
    def add_axvspan(ranges, color, label):
        for i, (start, end) in enumerate(ranges):
            if i == 0:
                plt.axvspan(start, end, color=color, alpha=0.4, label=label)
            else:
                plt.axvspan(start, end, color=color, alpha=0.4)

    # Agregar áreas sombreadas para cada tipo de split
    add_axvspan(train_ranges, 'green', 'Entrenamiento')
    add_axvspan(val_ranges, 'blue', 'Validación')
    add_axvspan(test_ranges, 'red', 'Prueba')

    # Títulos y etiquetas
    plt.title(f'Dimensión {dim} - Horizonte de Predicción: {horizon} días')
    plt.xlabel('Índice de Tiempo')
    plt.ylabel('Valor')
    plt.legend()
    plt.show()


In [None]:
from sklearn.metrics import mean_absolute_percentage_error, mean_absolute_error, mean_squared_error, r2_score

def calculate_metrics(y_true, y_pred):
    mape = mean_absolute_percentage_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mse = mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return mape, mae, rmse, mse, r2


In [161]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, Dropout
from tensorflow.keras.optimizers import Adam

In [162]:
def create_and_train_model(model_type, neurons, dropout_rate, batch_size, X_train, y_train, X_val, y_val):
    model = Sequential()
    if model_type == 'MLP':
        model.add(Dense(neurons, activation='relu', input_shape=(X_train.shape[1],)))
        model.add(Dropout(dropout_rate))
        model.add(Dense(1))
    elif model_type == 'RNN':
        model.add(SimpleRNN(neurons, activation='tanh', input_shape=(X_train.shape[1], X_train.shape[2])))
        model.add(Dropout(dropout_rate))
        model.add(Dense(1))
    elif model_type == 'LSTM':
        model.add(LSTM(neurons, activation='tanh', input_shape=(X_train.shape[1], X_train.shape[2])))
        model.add(Dropout(dropout_rate))
        model.add(Dense(1))
    else:
        raise ValueError("Tipo de modelo no soportado.")

    model.compile(optimizer=Adam(), loss='mse', metrics=['mae'])
    history = model.fit(X_train, y_train, epochs=100, batch_size=batch_size,
                        validation_data=(X_val, y_val), verbose=0)
    return model, history


In [None]:
import statsmodels.api as sm
from statsmodels.stats.diagnostic import acorr_ljungbox, jarque_bera

# Hiperparámetros a explorar
dropout_rates = [0.2, 0.4, 0.6, 0.8]
neurons_list = [10, 100, 1000, 10000]
batch_sizes = [16, 32, 64, 128]

results = []

for model_type in ['MLP', 'RNN', 'LSTM']:
    for neurons in neurons_list:
        for dropout_rate in dropout_rates:
            for batch_size in batch_sizes:
                # Preprocesamiento específico si es necesario
                if model_type in ['RNN', 'LSTM']:
                    X_train_reshaped = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
                    X_val_reshaped = X_val.reshape((X_val.shape[0], X_val.shape[1], 1))
                else:
                    X_train_reshaped = X_train
                    X_val_reshaped = X_val

                model, history = create_and_train_model(model_type, neurons, dropout_rate, batch_size,
                                                        X_train_reshaped, y_train, X_val_reshaped, y_val)
                # Predicciones
                y_pred_train = model.predict(X_train_reshaped)
                y_pred_val = model.predict(X_val_reshaped)

                # Cálculo de métricas para el conjunto de entrenamiento
                mape_train, mae_train, rmse_train, mse_train, r2_train = calculate_metrics(y_train, y_pred_train)

                # Pruebas estadísticas sobre los residuos del entrenamiento
                residuals_train = y_train.flatten() - y_pred_train.flatten()
                lb_test = acorr_ljungbox(residuals_train, lags=[10], return_df=True)
                jb_test = jarque_bera(residuals_train)

                # Guardar los resultados
                results.append({
                    'Model': model_type,
                    'Neurons': neurons,
                    'Dropout': dropout_rate,
                    'BatchSize': batch_size,
                    'MAPE': mape_train,
                    'MAE': mae_train,
                    'RMSE': rmse_train,
                    'MSE': mse_train,
                    'R2': r2_train,
                    'Ljung-Box p-value': lb_test['lb_pvalue'].values[0],
                    'Jarque-Bera p-value': jb_test[1],
                    'History': history,
                    'Residuals': residuals_train
                })


In [None]:
results_df = pd.DataFrame(results)


In [None]:
table_train = results_df[['Model', 'Neurons', 'Dropout', 'BatchSize', 'MAPE', 'MAE', 'RMSE', 'MSE', 'R2',
                          'Ljung-Box p-value', 'Jarque-Bera p-value']]


In [None]:
# Repite el proceso de evaluación para el conjunto de prueba
for result in results:
    model = result['Model']
    neurons = result['Neurons']
    dropout_rate = result['Dropout']
    batch_size = result['BatchSize']
    trained_model = result['Model']

    # Preprocesamiento específico si es necesario
    if model in ['RNN', 'LSTM']:
        X_test_reshaped = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))
    else:
        X_test_reshaped = X_test

    y_pred_test = trained_model.predict(X_test_reshaped)
    mape_test, mae_test, rmse_test, mse_test, r2_test = calculate_metrics(y_test, y_pred_test)

    # Prueba de independencia
    residuals_test = y_test.flatten() - y_pred_test.flatten()
    lb_test = acorr_ljungbox(residuals_test, lags=[10], return_df=True)

    result['MAPE_test'] = mape_test
    result['MAE_test'] = mae_test
    result['RMSE_test'] = rmse_test
    result['MSE_test'] = mse_test
    result['R2_test'] = r2_test
    result['Ljung-Box p-value_test'] = lb_test['lb_pvalue'].values[0]

table_test = results_df[['Model', 'Neurons', 'Dropout', 'BatchSize', 'MAPE_test', 'MAE_test', 'RMSE_test', 'MSE_test', 'R2_test', 'Ljung-Box p-value_test']]


In [None]:
import matplotlib.pyplot as plt

for result in results:
    history = result['History']
    plt.figure(figsize=(10, 6))
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f"Modelo: {result['Model']}, Neuronas: {result['Neurons']}, Dropout: {result['Dropout']}")
    plt.xlabel('Épocas')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()


In [None]:
train_errors = [result['MAE'] for result in results]
val_errors = [result['MAE_test'] for result in results]  # Usamos MAE_test para validación si está disponible
test_errors = [result['MAE_test'] for result in results]

data = [train_errors, val_errors, test_errors]
labels = ['Entrenamiento', 'Validación', 'Prueba']

plt.boxplot(data, labels=labels)
plt.ylabel('MAE')
plt.title('Distribución de Errores MAE')
plt.show()


In [None]:
best_result = min(results, key=lambda x: x['RMSE'])

# Predecir en el conjunto de prueba
model = best_result['Model']
trained_model = best_result['Model']

if model in ['RNN', 'LSTM']:
    X_test_reshaped = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))
else:
    X_test_reshaped = X_test

y_pred_test = trained_model.predict(X_test_reshaped)

# Graficar
plt.figure(figsize=(12, 6))
plt.plot(range(len(y_test)), y_test, label='Real')
plt.plot(range(len(y_pred_test)), y_pred_test, label='Predicción')
plt.title('Predicción vs Real en el Conjunto de Prueba')
plt.xlabel('Tiempo')
plt.ylabel('Valor')
plt.legend()
plt.show()


In [163]:
import statsmodels.graphics.api as smg
from statsmodels.graphics.tsaplots import plot_acf

# Serie de residuos
plt.figure(figsize=(10, 4))
plt.plot(best_result['Residuals'])
plt.title('Serie de Residuos')
plt.show()

# QQPlot
sm.qqplot(best_result['Residuals'], line='s')
plt.title('QQ Plot de Residuos')
plt.show()

# ACF de residuos
plot_acf(best_result['Residuals'], lags=20)
plt.title('ACF de Residuos')
plt.show()

NameError: name 'best_result' is not defined

<Figure size 1000x400 with 0 Axes>