### Auto Bayesian Neural Networks


autoBNN on EUR/USD

In [33]:
import yfinance as yf
import numpy as np
import pandas as pd
import autobnn as ab
import jax
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

In [34]:
# Download EUR/USD data from Yahoo Finance
eurusd = yf.download("EURUSD=X", start="2005-01-01", end="2025-06-22")
eurusd = eurusd.reset_index()

  eurusd = yf.download("EURUSD=X", start="2005-01-01", end="2025-06-22")
[*********************100%***********************]  1 of 1 completed


In [35]:
# Convert the index to a datetime format if necessary
eurusd['Date'] = pd.to_datetime(eurusd['Date'])

In [37]:
# Drop the 'Volume' column if it contains only zeros
if 'Volume' in eurusd.columns:
    eurusd.drop(columns=['Volume'], inplace=True)

In [38]:
def label_data(df, lookahead=5, threshold=0.002):
    """
    Labels each candle based on future closing price percentage change.
    Parameters:
    -----------
    df : pd.DataFrame
        Data containing at least a 'Close' column.
    lookahead : int, optional
        Number of candles to look ahead (default is 5).
    threshold : float, optional
        Percentage change threshold for classification (default is 0.002 or 0.2%).
    """
    
    # Compute future percentage change in closing price
    df["future_return"] = df["Close"].pct_change(lookahead).shift(-lookahead)
    # Assign labels
    df["label"] = 0  # Default: Neutral
    df.loc[df["future_return"] > threshold, "label"] = 2  # Up
    df.loc[df["future_return"] < -threshold, "label"] = 1  # Down
    # Drop future_return column (not needed in final output)
    df.drop(columns=["future_return"], inplace=True)

In [39]:
label_data(eurusd)

  df.drop(columns=["future_return"], inplace=True)


In [40]:
print(eurusd[eurusd["label"] != 0].head(10))

Price        Date     Close      High       Low      Open label
Ticker             EURUSD=X  EURUSD=X  EURUSD=X  EURUSD=X      
0      2005-01-03  1.347001  1.358105  1.340195  1.358105     1
1      2005-01-04  1.328198  1.349601  1.326102  1.347001     1
3      2005-01-06  1.318305  1.327898  1.316396  1.327898     2
4      2005-01-07  1.306097  1.324802  1.302999  1.318200     2
5      2005-01-10  1.310994  1.311992  1.306404  1.307600     1
6      2005-01-11  1.311699  1.317107  1.310496  1.311407     1
7      2005-01-12  1.325592  1.329098  1.308695  1.311407     1
8      2005-01-13  1.321109  1.326594  1.320306  1.325908     1
9      2005-01-14  1.309895  1.321196  1.306797  1.321196     1
11     2005-01-18  1.301795  1.307292  1.300306  1.304904     1


In [41]:
def create_sliding_window_dataset(df, window_size=50, lookahead=5, threshold=0.002):
    if "Close" not in df.columns or "label" not in df.columns:
        raise ValueError("DataFrame must contain 'Close' and 'label' columns.")
    label_data(df=df, lookahead=lookahead, threshold=threshold)
    close_prices = df["Close"].values
    labels = df["label"].values
    n_samples = len(df) - window_size
    if n_samples <= 0:
        raise ValueError("Not enough data to create even one window."
                         "Increase your dataset or decrease window_size.")
    X_list = []
    y_list = []
    for i in range(n_samples):
        window_data = close_prices[i : i + window_size]
        last_candle_label = labels[i + window_size - 1]  # label of the last candle in window_size
        X_list.append(window_data)
        y_list.append(last_candle_label)
    X = np.array(X_list)
    y = np.array(y_list)
    return X, y

In [42]:
# Create sliding window dataset
X, y = create_sliding_window_dataset(eurusd, window_size=50)

# Display the shapes of the resulting datasets
print("Shape of X:", X.shape)
print("Shape of y:", y.shape)

Shape of X: (5259, 50, 1)
Shape of y: (5259,)


  df.drop(columns=["future_return"], inplace=True)


In [43]:
def walk_forward_autobnn_ovr(
    df, 
    window_size=100,
    train_size=50,
    step_size=5,
    threshold=0.002,
    n_classes=3
    ):
    # 1. Build your dataset
    X, y = create_sliding_window_dataset(df=df, window_size=window_size, threshold=threshold)
    
    # Ensure X is 2D
    if X.ndim == 1:
        X = X.reshape(-1, 1)
    elif X.ndim > 2:
        X = X.reshape(X.shape[0], -1)
    
    n_total = len(X)
    if n_total < train_size + 1:
        raise ValueError("Not enough samples for walk-forward analysis.")
    
    accuracies = []
    
    # 2. Walk-forward loop
    for i in range(0, n_total - train_size, step_size):
        X_train = X[i : i + train_size]
        y_train = y[i : i + train_size]
        
        # Ensure X_train is 2D
        if X_train.ndim == 1:
            X_train = X_train.reshape(-1, 1)
        elif X_train.ndim > 2:
            X_train = X_train.reshape(X_train.shape[0], -1)
        
        test_index = i + train_size
        if test_index >= n_total:
            break
            
        X_test = X[test_index : test_index + 1]
        y_test = y[test_index : test_index + 1]
        
        # Ensure X_test is 2D
        if X_test.ndim == 1:
            X_test = X_test.reshape(-1, 1)
        elif X_test.ndim > 2:
            X_test = X_test.reshape(X_test.shape[0], -1)
        
        print(f"Training shape: {X_train.shape}, Test shape: {X_test.shape}")
        
        # Scale
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        # 3. Train one binary model per class
        estimators = []
        for c in range(n_classes):
            # Make binary labels: 1 if y == c, else 0
            y_train_c = (y_train == c).astype(int)
            model_c = ab.operators.Add(
                bnns=(
                    ab.kernels.PeriodicBNN(width=20, period=12.0),
                    ab.kernels.LinearBNN(width=20),
                    ab.kernels.MaternBNN(width=20),
                )
            )
            estimator_c = ab.estimators.AutoBnnMapEstimator(
                model_c,
                likelihood_model="normal_likelihood_logistic_noise",  
                seed=jax.random.PRNGKey(42),
                periods=[12],
            )

            estimator_c.fit(X_train_scaled, y_train_c)
            estimators.append(estimator_c)

        # 4. Predict probabilities for each class on the single test sample
        class_probs = []
        for c in range(n_classes):
            y_pred_c = estimators[c].predict(X_test_scaled)
            
            if y_pred_c.ndim == 2:
                logit = y_pred_c[0, 0]  # first row, mean col
            else:
                logit = y_pred_c[0]  # shape (1,) => just the mean
            
            prob_c = 1.0 / (1.0 + np.exp(-logit))
            class_probs.append(prob_c)

        # 5. Pick the class with the highest probability
        y_pred_class = np.argmax(class_probs)

        # 6. Single-sample accuracy (0 or 1)
        acc = accuracy_score(y_test, [y_pred_class])
        accuracies.append(acc)
    
    return accuracies


In [45]:
dfsample = eurusd[:1000].copy()
results = walk_forward_autobnn_ovr(
    df=dfsample,
    window_size=40,
    train_size=20,
    step_size=5,
    threshold=0.002,
    n_classes=3
)

  df.drop(columns=["future_return"], inplace=True)


Training shape: (20, 40), Test shape: (1, 40)


AttributeError: module 'jax.experimental.shard_map' has no attribute 'register_check'