### [Forecasting Bitcoin Autocorrelation](https://pyquantlab.medium.com/forecasting-bitcoin-autocorrelation-with-74-directional-accuracy-using-lstms-59ba7395fd48)

> 74% Directional Accuracy using LSTMs

In [1]:
!pip install -q numpy pandas yfinance matplotlib
!pip install -q scikit-learn "tensorflow==2.18.0"

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from math import sqrt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping
import datetime

In [4]:
# Data and Feature Parameters
ticker = 'BTC-USD'
start_date = '2023-01-01'
end_date = datetime.datetime.now().strftime('%Y-%m-%d')
rolling_window = 30 # Window for calculating autocorrelation
lag = 1             # Lag for autocorrelation (day-over-day)

# Model Hyperparameters
num_lags = 90       # How many past autocorrelation values to use as input
train_test_split = 0.80 # 80% for training, 20% for testing
num_neurons_in_hidden_layers = 128 # LSTM layer size
num_epochs = 100    # Max training epochs
batch_size = 20     # Samples per gradient update
dropout_rate = 0.1  # Regularization rate

In [5]:
print(f"Fetching {ticker} data from {start_date} to {end_date}...")
data = yf.download(ticker, start=start_date, end=end_date)
# Clean up potential multi-level columns from yfinance
if isinstance(data.columns, pd.MultiIndex):
    data.columns = data.columns.droplevel(1)
data = data['Close'] # We only need closing prices
data = data.dropna()
print(f"Data fetched successfully. Shape: {data.shape}")

Fetching BTC-USD data from 2023-01-01 to 2025-04-29...
YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed

Data fetched successfully. Shape: (849,)





In [6]:
print(f"Calculating {rolling_window}-day rolling autocorrelation (lag={lag})...")
rolling_autocorr_series = data.rolling(
    window=rolling_window
).apply(lambda x: x.autocorr(lag=lag), raw=False) # Use pandas Series method

rolling_autocorr = rolling_autocorr_series.dropna().values # Drop initial NaNs
rolling_autocorr = np.reshape(rolling_autocorr, (-1)) # Ensure 1D shape
print(f"Rolling autocorrelation calculated. Shape: {rolling_autocorr.shape}")

Calculating 30-day rolling autocorrelation (lag=1)...
Rolling autocorrelation calculated. Shape: (820,)


In [7]:
def data_preprocessing(data_series, n_lags, train_split_ratio):
    """
    Prepares time series data into lags for supervised learning and splits.
    """
    X, y = [], []
    # Create sequences: Use 'n_lags' points to predict the next point
    for i in range(n_lags, len(data_series)):
        X.append(data_series[i-n_lags:i])
        y.append(data_series[i])
    X, y = np.array(X), np.array(y)

    # Split into training and testing sets
    split_index = int(len(X) * train_split_ratio)
    x_train = X[:split_index]
    y_train = y[:split_index]
    x_test = X[split_index:]
    y_test = y[split_index:]
    print(f"Data shapes: X_train={x_train.shape}, y_train={y_train.shape}, X_test={x_test.shape}, y_test={y_test.shape}")
    return x_train, y_train, x_test, y_test

# Create the datasets
x_train, y_train, x_test, y_test = data_preprocessing(
    rolling_autocorr, num_lags, train_test_split
)

Data shapes: X_train=(584, 90), y_train=(584,), X_test=(146, 90), y_test=(146,)


In [8]:
# Reshape Input for LSTM [samples, time steps, features]
x_train = x_train.reshape((-1, num_lags, 1))
x_test = x_test.reshape((-1, num_lags, 1))
print(f"Data reshaped for LSTM: x_train={x_train.shape}, x_test={x_test.shape}")

Data reshaped for LSTM: x_train=(584, 90, 1), x_test=(146, 90, 1)


In [9]:
print("Building LSTM model...")
model = Sequential()
model.add(LSTM(units=num_neurons_in_hidden_layers, input_shape=(num_lags, 1)))
model.add(BatchNormalization()) # Regularization / Stability
model.add(Dropout(dropout_rate)) # Regularization
model.add(Dense(units=1))       # Output layer

# Compile: Define loss function and optimizer
model.compile(loss='mean_squared_error', optimizer='adam')
model.summary() # Display model structure

Building LSTM model...


In [10]:
# Early stopping implementation
early_stopping = EarlyStopping(monitor='loss', patience=15,
                             restore_best_weights=True, verbose=1)

print("Training model...")
history = model.fit(x_train, y_train,
                    epochs=num_epochs,
                    batch_size=batch_size,
                    callbacks=[early_stopping],
                    verbose=1,
                    shuffle=False) # Keep temporal order if needed
print("Training finished.")
if early_stopping.stopped_epoch > 0:
    print(f"Early stopping triggered at epoch {early_stopping.stopped_epoch + 1}")

Training model...
Epoch 1/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 57ms/step - loss: 0.5166
Epoch 2/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 59ms/step - loss: 0.0782
Epoch 3/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 62ms/step - loss: 0.0236
Epoch 4/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 63ms/step - loss: 0.0299
Epoch 5/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 60ms/step - loss: 0.0274
Epoch 6/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 66ms/step - loss: 0.0181
Epoch 7/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 62ms/step - loss: 0.0171
Epoch 8/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 63ms/step - loss: 0.0199
Epoch 9/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 70ms/step - loss: 0.0195
Epoch 10/100
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s

In [11]:
print("Predicting...")
y_predicted_train = model.predict(x_train).flatten()
y_predicted_test = model.predict(x_test).flatten()

# Prepare actual values (flatten)
y_train_flat = y_train.flatten()
y_test_flat = y_test.flatten()

Predicting...
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 53ms/step
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step


In [12]:
def calculate_directional_accuracy(actual, predicted):
    actual = np.asarray(actual)
    predicted = np.asarray(predicted)

    if len(actual) != len(predicted):
        raise ValueError("Actual and predicted arrays must be of the same length.")
    if len(actual) < 2:
        raise ValueError("Need at least two points to compute directional accuracy.")

    # Compute daily changes
    actual_diff = np.diff(actual)
    predicted_diff = np.diff(predicted)

    # Calculate direction: sign of the difference (+1, -1, or 0)
    actual_direction = np.sign(actual_diff)
    predicted_direction = np.sign(predicted_diff)

    # Count how many times the directions matched
    correct_direction = actual_direction == predicted_direction
    directional_accuracy = np.mean(correct_direction) * 100  # as a percentage

    return directional_accuracy

In [13]:
print("Evaluating performance...")
# Calculate Metrics
rmse_train = sqrt(mean_squared_error(y_train_flat, y_predicted_train))
rmse_test = sqrt(mean_squared_error(y_test_flat, y_predicted_test))

# (Assuming calculate_directional_accuracy function is defined as above)
accuracy_train = calculate_directional_accuracy(y_train_flat, y_predicted_train)
accuracy_test = calculate_directional_accuracy(y_test_flat, y_predicted_test)

min_len_train = min(len(y_train_flat), len(y_predicted_train))
min_len_test = min(len(y_test_flat), len(y_predicted_test))
correlation_train = np.corrcoef(y_train_flat[:min_len_train], y_predicted_train[:min_len_train])[0, 1]
correlation_test = np.corrcoef(y_test_flat[:min_len_test], y_predicted_test[:min_len_test])[0, 1]

# Print Results
print("\n--- Results ---")
print(f"RMSE (Train): {rmse_train}")
print(f"RMSE (Test): {rmse_test}")

print(f"Directional Accuracy (Train): {accuracy_train}")
print(f"Directional Accuracy (Test): {accuracy_test}")

print(f"Minimum length of train dataset: {min_len_train}")
print(f"Minimum length of test dataset: {min_len_test}")

print(f"Correlation In-Sample Predicted/Train: {correlation_train}")
print(f"Correlation Out-of-Sample Predicted/Test: {correlation_test}")
print("---------------\n")

Evaluating performance...

--- Results ---
RMSE (Train): 0.20620331707603212
RMSE (Test): 0.19150318450092627
Directional Accuracy (Train): 71.35506003430532
Directional Accuracy (Test): 73.79310344827587
Minimum length of train dataset: 584
Minimum length of test dataset: 146
Correlation In-Sample Predicted/Train: 0.9686713968669628
Correlation Out-of-Sample Predicted/Test: 0.9594252771398107
---------------



In [None]:
print("Plotting results...")
# (Assuming plot_train_test_values function is defined as above)
plot_train_test_values(n_train_plot=300, n_test_plot=len(y_test_flat),
                       y_train=y_train_flat,
                       y_test=y_test_flat,
                       y_predicted=y_predicted_test)