<h1><center>Bitcoin Price Forecasting using LSTM Neural Networks</center></h1>

<h3><center>CE418 - Neuro-Fuzzy Computing</center></h3> 
<h3><center>Aslanidou Aikaterina-Sofia </center></h3>  
<h3><center>University of Thessaly</center></h3>


<h3> About This Notebook </h3>        <b>This notebook presents the implementation of our project, including concise explanations and inline comments where necessary. A more detailed discussion, analysis, and evaluation of the results can be found in the accompanying report.</b>.


<h4>Importing Necessary Libraries</h4>    
In this section, we import the essential Python libraries required for data preprocessing, model training, evaluation, and visualization.  

In [122]:
import numpy as np
import pandas as pd
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization, Bidirectional, Input
from tensorflow.keras.optimizers import Adam
import time
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler

# For ARIMA
from statsmodels.tsa.arima.model import ARIMA

### Mean Absolute Percentage Error (MAPE) Function
The mean_absolute_percentage_error function calculates the Mean Absolute Percentage Error (MAPE) between the true values (y_true) and the predicted values (y_pred). This metric expresses the error as a percentage, making it useful for evaluating prediction accuracy in regression problems.


In [124]:
def mean_absolute_percentage_error(y_true, y_pred):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

### DataLoader class
The DataLoader class is responsible for loading, processing, and formatting Bitcoin price data for use in an LSTM-based neural network. It reads data from a CSV file and ensures the date column is in datetime format, sorts the data chronologically, selects only the date and close price columns, handles missing values, and resamples the data to an hourly frequency using forward fill. Additionally, it applies MinMax scaling to normalize the close prices. The class provides methods to filter and scale data within a specific date range, create sequences of sequence_length for time series prediction, split the data into training, validation, and test sets, and verify generated sequences with raw close prices.

In [126]:
class DataLoader:
    def __init__(self, filepath):
        print("=== Initializing DataLoader ===")
        self.data = pd.read_csv(filepath)

        # Make sure 'date' is recognized as datetime
        self.data['date'] = pd.to_datetime(self.data['date'], errors='coerce')

        # Sort by date
        self.data.sort_values('date', inplace=True)
        print(f"Initial CSV shape: {self.data.shape}")

        # Keep only the columns we need
        self.data = self.data[['date', 'close']]

        # Check for any NA in 'close'
        na_count = self.data['close'].isna().sum()
        print(f"NaNs in 'close' before resampling: {na_count}")

        # Set date as index and resample to hourly frequency
        self.data.set_index('date', inplace=True)
        self.data = self.data.resample('h').ffill()  
        print(f"Shape after resampling: {self.data.shape}")

        # Reset index back to columns
        self.data.reset_index(inplace=True)
        print("First few rows after resampling + forward-fill:")
        print(self.data.head(5))

        # Initialize MinMaxScaler
        self.scaler = MinMaxScaler(feature_range=(0, 1))

    def format_data(self, start_date, end_date):
        """
        Filters data within the given date range and applies MinMax scaling to 'close' -> 'scaled_close'.
        """
        print(f"\n=== format_data for {start_date} to {end_date} ===")
        mask = (
            (self.data['date'] >= pd.to_datetime(start_date)) &
            (self.data['date'] <= pd.to_datetime(end_date))
        )
        data_range = self.data.loc[mask].copy()
        print(f"  => Data range shape: {data_range.shape}")

        if data_range.empty:
            print("  WARNING: data_range is EMPTY! Check your CSV date range or date filter.")
            data_range['scaled_close'] = np.nan
            return data_range

        # Scale 'close'
        data_range['scaled_close'] = self.scaler.fit_transform(data_range[['close']])
        print("  => Scaled data stats:")
        print("     close.min =", data_range['close'].min())
        print("     close.max =", data_range['close'].max())
        print("     scaled_close.min =", data_range['scaled_close'].min())
        print("     scaled_close.max =", data_range['scaled_close'].max())

        # Check if scaled_close has any NaN
        nan_count = data_range['scaled_close'].isna().sum()
        print(f"  => NaNs in scaled_close: {nan_count}")

        return data_range

    def create_sequences(self, data, timestamps, sequence_length=144):
        """
        Creates sequences of `sequence_length` (6 days). The label is the next hour after the sequence.
        """
        sequences = []
        labels = []
        sequence_timestamps = []

        for i in range(len(data) - sequence_length - 1):
            seq = data[i : i + sequence_length, 0]
            label = data[i + sequence_length, 0]
            sequences.append(seq)
            labels.append(label)
            sequence_timestamps.append(timestamps[i + sequence_length])

        return np.array(sequences), np.array(labels), sequence_timestamps

    def get_train_val_test(self):
        """
        Splits data into training, validation, and test sets.
        """
        print("\n=== get_train_val_test ===")
        train_data = self.format_data('2021-01-01', '2022-01-31')
        val_data   = self.format_data('2022-02-01', '2022-02-20')
        test_data  = self.format_data('2022-02-21', '2022-02-28')
        
        print("\n--- Checking for empties in splits ---")
        print(f"train_data.shape = {train_data.shape}")
        print(f"val_data.shape   = {val_data.shape}")
        print(f"test_data.shape  = {test_data.shape}")

        train_values = train_data[['scaled_close']].values
        val_values   = val_data[['scaled_close']].values
        test_values  = test_data[['scaled_close']].values

        train_timestamps = train_data['date'].values
        val_timestamps   = val_data['date'].values
        test_timestamps  = test_data['date'].values

        X_train, y_train, _ = self.create_sequences(train_values, train_timestamps)
        X_val,   y_val,   _ = self.create_sequences(val_values, val_timestamps)
        X_test,  y_test,  _ = self.create_sequences(test_values, test_timestamps)

        print("\n--- Shapes after create_sequences ---")
        print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
        print(f"X_val:   {X_val.shape},   y_val:   {y_val.shape}")
        print(f"X_test:  {X_test.shape},  y_test:  {y_test.shape}")

        # Check for NaNs:
        print(f"NaN count in X_train: {np.isnan(X_train).sum()}")
        print(f"NaN count in y_train: {np.isnan(y_train).sum()}")

        return X_train, y_train, X_val, y_val, X_test, y_test,train_data, val_data, test_data

    def verify_sequences(self):
        """
        Uses raw 'close' data (unscaled) for a human-readable check.
        """
        print("\n=== verify_sequences (raw data) ===")
        data_values = self.data[['close']].values
        timestamps = self.data['date'].values
        X, y, label_timestamps = self.create_sequences(data_values, timestamps)

        print(f"➡️ Total sequences: {len(X)}")
        for i in range(min(3, len(X))):
            print("\n" + "=" * 70)
            print(f"🔹 Sequence {i+1}")
            print(f"  Start Time: {timestamps[i]}")
            print(f"  End Time:   {timestamps[i + 143]}")
            print(f"  Label Time: {label_timestamps[i]}")
            print(f"  Sequence Values (Last 5): {X[i][-5:].flatten()} ...")
            print(f"  Label Price: {y[i]}")
            print("=" * 70)

        return X, y


### BitcoinPriceLSTM: LSTM-Based Model for Bitcoin Price Prediction
The BitcoinPriceLSTM class implements a Long Short-Term Memory (LSTM) neural network for predicting Bitcoin prices. It uses a Bidirectional LSTM layer to capture both past and future dependencies in sequential data, includes Batch Normalization and Dropout layers to improve generalization and prevent overfitting, and features multiple LSTM and Dense layers with ReLU activation to refine predictions. The model is compiled with the Adam optimizer and mean_squared_error loss for robust training. Key methods include build_model to construct the neural network, train to fit the model to training data, evaluate to measure performance on test data, and predict to generate forecasts.

In [128]:
class BitcoinPriceLSTM:
    def __init__(self):
        self.model = self.build_model()

    def build_model(self):
        model = Sequential()
        
        model.add(Input(shape=(144, 1)))
        model.add(Bidirectional(LSTM(units=20, return_sequences=True)))
        model.add(Dropout(0.2))
        model.add(BatchNormalization())

        model.add(LSTM(units=20, return_sequences=False))
        model.add(Dropout(0.2))

        model.add(Dense(units=20, activation='relu'))
        model.add(Dropout(0.2))

        model.add(Dense(units=1))

        model.compile(optimizer=Adam(), loss='mean_squared_error')
        return model
    
    def train(self, X_train, y_train, X_val, y_val, epochs=5, batch_size=16):
        print(f"\n=== Training on {len(X_train)} samples ===")
        start_time = time.time()
        self.model.fit(
            X_train, y_train, 
            validation_data=(X_val, y_val),
            epochs=epochs, 
            batch_size=batch_size,
            verbose=1
        )
        return time.time() - start_time
    
    def evaluate(self, X_test, y_test):
        loss = self.model.evaluate(X_test, y_test, verbose=0)
        return loss
    
    def predict(self, X):
        return self.model.predict(X, verbose=0)



### ARIMA Helper Functions for Time Series Forecasting
These helper functions facilitate the implementation of the ARIMA (AutoRegressive Integrated Moving Average) model for Bitcoin price forecasting. The prepare_arima_data(df) function converts a DataFrame containing hourly Bitcoin price data into a Pandas Series indexed by date, ensuring close price values are floats to avoid scaling issues and making the data compatible with ARIMA modeling. The walk_forward_arima(train_series, test_series, p=1, d=1, q=1) function implements a walk-forward validation approach, where the ARIMA model predicts one step ahead at each iteration while updating the training history with actual test set values. This ensures a realistic, rolling evaluation of ARIMA’s forecasting ability.

In [130]:
# ============= ARIMA HELPER FUNCTIONS =============

def prepare_arima_data(df):
    """
    Convert the hourly DataFrame (with 'date' and 'close')
    into a Pandas Series indexed by date, suitable for ARIMA.
    """
    # We assume 'df' has columns ['date', 'close'] and is sorted ascending.
    # Set date as index
    temp = df.copy()
    temp.set_index('date', inplace=True)
    # We'll ensure it's float (in case of scaling issues)
    ts = temp['close'].astype(float)
    return ts

def walk_forward_arima(train_series, test_series, p=1, d=1, q=1):
    """
    Perform a simple day-by-day (or hour-by-hour) walk-forward ARIMA forecast.
    train_series and test_series should be Pandas Series with a datetime index.
    """
    history = list(train_series.values)
    predictions = []

    # We iterate over the test set, forecast 1 step at a time
    for t in range(len(test_series)):
        model = ARIMA(history, order=(p, d, q))
        model_fit = model.fit()
        forecast_value = model_fit.forecast(steps=1)[0]
        predictions.append(forecast_value)
        # append the actual value from the test to the history
        history.append(test_series.iloc[t])

    # Convert predictions list -> numpy array for metrics
    predictions = np.array(predictions)
    return predictions


### Main Execution: LSTM vs ARIMA for Bitcoin Price Prediction
This script executes the full pipeline, training and evaluating both an LSTM neural network and an ARIMA model to compare their performance in predicting Bitcoin prices. The workflow includes data preparation, where the dataset is loaded and preprocessed using DataLoader, and then split into training, validation, and test sets. The LSTM model is trained using different training set percentages (20%, 40%, 60%, 80%, 100%) and evaluated on the test set using Mean Squared Error (MSE) and Mean Absolute Percentage Error (MAPE). The ARIMA model is then trained using a walk-forward forecasting approach to simulate real-world forecasting. Finally, the performance of both models is compared using MSE and MAPE to determine which approach is more effective for Bitcoin price prediction.

In [132]:
# ============================= MAIN =============================
if __name__ == "__main__":
    data_loader = DataLoader("dataset.csv")

    # 1. Get the full train/val/test sets
    X_train_full, y_train_full, X_val, y_val, X_test, y_test,df_train, df_val, df_test = data_loader.get_train_val_test()
    
    print("\n--- Post get_train_val_test Stats (FULL TRAIN) ---")
    if len(X_train_full) > 0:
        print("X_train_full max:", np.nanmax(X_train_full), "| min:", np.nanmin(X_train_full))
        print("y_train_full max:", np.nanmax(y_train_full), "| min:", np.nanmin(y_train_full))
    else:
        print("X_train_full is EMPTY or invalid for the specified date range.")

    # 2. Optional: verify unscaled sequences
    data_loader.verify_sequences()

    # 3. Only proceed if we have training data
    total_train_samples = len(X_train_full)
    if total_train_samples == 0:
        print("No training samples available. Check your CSV date range!")
    else:
        # For each percentage, we re-initialize the model, train, then evaluate
        for percentage in [20, 40, 60, 80, 100]:
            subset_size = int(total_train_samples * (percentage / 100))

            print(f"\n================== LSTM TRAINING with {percentage}% of the Data ==================")
            print(f"Subset size: {subset_size}")

            # Subset the training data
            X_train_sub = X_train_full[:subset_size]
            y_train_sub = y_train_full[:subset_size]

            # Build a fresh model for each percentage
            model = BitcoinPriceLSTM()

            # Train
            print(f"Starting training for {percentage}% subset ...")
            train_time = model.train(X_train_sub, y_train_sub, X_val, y_val, epochs=1, batch_size=16)
            print(f"Training time: {train_time:.2f} seconds")

            # Evaluate
            if len(X_test) > 0:
                test_loss = model.evaluate(X_test, y_test)
                print(f"[{percentage}% Data] Test Loss (MSE): {test_loss:.6f}")

                # Predict
                predictions = model.predict(X_test)
                mse  = mean_squared_error(y_test, predictions)
                mape = mean_absolute_percentage_error(y_test, predictions)
                print(f"[{percentage}% Data] MSE on Test:  {mse:.6f}")
                print(f"[{percentage}% Data] MAPE on Test: {mape:.2f}%")
            else:
                print("No test set to evaluate on. Check date ranges!")

            if percentage ==  100:
                best_lstm_mse = mse
                best_lstm_mape = mape
                best_model = model


                # ===================== ARIMA COMPARISON =====================
    print("\n\n=== ARIMA MODEL COMPARISON ===")

    # Prepare train/val/test data as series for ARIMA
    # We'll combine train+val for final ARIMA training, then do walk-forward on test
    # or you can do a walk-forward on validation as well. For simplicity, let's just do train+val -> test.

    # 1) Prepare the train, val, test DataFrames for ARIMA
    arima_train_series = prepare_arima_data(df_train)  # hourly data for train
    arima_val_series   = prepare_arima_data(df_val)
    arima_test_series  = prepare_arima_data(df_test)

    # 2) Optionally combine train+val for final ARIMA. 
    #    If you want to do walk-forward on the entire test, you can do that directly from train.
    combined_train_val_series = pd.concat([arima_train_series, arima_val_series])

    # 3) Choose ARIMA(p, d, q). 
    #    For example, let's pick (1,1,1). You can tune this with ACF/PACF or grid search.
    p, d, q = 1, 1, 1

    print("Fitting ARIMA on combined (train+val) data ...")
    # We'll do a walk-forward approach on the test data
    # to simulate real forecasting scenario.
    arima_predictions = walk_forward_arima(combined_train_val_series, arima_test_series, p, d, q)

    # 4) Compute ARIMA MSE and MAPE on the test set
    # Make sure we align shapes
    test_actual = arima_test_series.values
    if len(test_actual) == len(arima_predictions):
        arima_mse  = mean_squared_error(test_actual, arima_predictions)
        arima_mape = mean_absolute_percentage_error(test_actual, arima_predictions)

        print(f"ARIMA Test MSE:  {arima_mse:.6f}")
        print(f"ARIMA Test MAPE: {arima_mape:.2f}%")
    else:
        print("Error: mismatch in length between ARIMA predictions and test data!")

    # 5) Compare ARIMA vs LSTM
    print("\n=== Final Comparison (Test Set) ===")
    if best_lstm_mse is not None and best_lstm_mape is not None:
        print(f"LSTM Model  => MSE:  {best_lstm_mse:.6f}, MAPE: {best_lstm_mape:.2f}%")
    else:
        print("No final LSTM metrics found. Ensure you trained with 100% of the data.")

    if 'arima_mse' in locals() and 'arima_mape' in locals():
        print(f"ARIMA Model => MSE:  {arima_mse:.6f}, MAPE: {arima_mape:.2f}%")
    else:
        print("No ARIMA metrics found. Check ARIMA training logic.")

=== Initializing DataLoader ===
Initial CSV shape: (610782, 9)
NaNs in 'close' before resampling: 0
Shape after resampling: (10180, 1)
First few rows after resampling + forward-fill:
                 date     close
0 2021-01-01 00:00:00       NaN
1 2021-01-01 01:00:00  29047.01
2 2021-01-01 02:00:00  29518.58
3 2021-01-01 03:00:00  29278.12
4 2021-01-01 04:00:00  29386.34

=== get_train_val_test ===

=== format_data for 2021-01-01 to 2022-01-31 ===
  => Data range shape: (9481, 2)
  => Scaled data stats:
     close.min = 28976.74
     close.max = 68603.28
     scaled_close.min = 0.0
     scaled_close.max = 1.0
  => NaNs in scaled_close: 1

=== format_data for 2022-02-01 to 2022-02-20 ===
  => Data range shape: (457, 2)
  => Scaled data stats:
     close.min = 36462.27
     close.max = 45427.47
     scaled_close.min = 0.0
     scaled_close.max = 1.0
  => NaNs in scaled_close: 0

=== format_data for 2022-02-21 to 2022-02-28 ===
  => Data range shape: (169, 2)
  => Scaled data stats:
    

### Inference Time
The inference time of the trained LSTM model was measured using 20 randomly selected test samples. Each sample was processed individually, and the time taken for prediction was recorded in milliseconds. The average inference time was computed to provide a benchmark for the model's efficiency.


In [134]:
print("\n=== Measuring Inference Time with final LSTM model ===")
# We'll do 20 random samples from X_test
times = []
n_iterations = 20

for _ in range(n_iterations):
    random_index = np.random.randint(0, len(X_test))
    random_sample = X_test[random_index]
            
    start_time = time.time()
    best_model.model.predict(np.array([random_sample]), verbose=0)
    end_time = time.time()
            
    # Convert to milliseconds
    inference_time = 1000.0 * (end_time - start_time)
    times.append(inference_time)

average_inference_time = np.mean(times)
print(f"Average Inference Time (20 random samples): {average_inference_time:.2f} ms")




=== Measuring Inference Time with final LSTM model ===
Average Inference Time (20 random samples): 129.41 ms


### Final Trained Weights of the LSTM Model

After training, the LSTM model's learned parameters, including weights and biases, were extracted and analyzed. Each layer of the network has a unique set of parameters whose shapes depend on the architecture of the model. Understanding these parameter dimensions helps in interpreting the model’s learned features and can be useful for debugging, fine-tuning, or applying transfer learnin.


In [136]:
print("\n=== Final Trained Parameters of the Neural Network ===")

# Retrieve the weights from the LSTM model
trained_weights = best_model.model.get_weights()

for i, param in enumerate(trained_weights):
    print(f"\n🔹 Parameter {i+1}: Shape {param.shape}")
    print(param)  # Prints the actual numerical values




=== Final Trained Parameters of the Neural Network ===

🔹 Parameter 1: Shape (1, 80)
[[nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
  nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
  nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
  nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
  nan nan nan nan nan nan nan nan]]

🔹 Parameter 2: Shape (20, 80)
[[nan nan nan ... nan nan nan]
 [nan nan nan ... nan nan nan]
 [nan nan nan ... nan nan nan]
 ...
 [nan nan nan ... nan nan nan]
 [nan nan nan ... nan nan nan]
 [nan nan nan ... nan nan nan]]

🔹 Parameter 3: Shape (80,)
[nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
 nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
 nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
 nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan nan
 nan nan nan nan nan nan nan nan]

🔹 Pa