In [3]:
import keras
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import yfinance as yf
import pandas as pd

In [9]:
df = yf.download("BTC-USD", start="2020-01-01", end="2025-01-01")

data = df["Close"]

bitcoin_prices = df["Close"].values

  df = yf.download("BTC-USD", start="2020-01-01", end="2025-01-01")
[*********************100%***********************]  1 of 1 completed


**Preparing Time Series Data**

- Creating Windows and Labels:

In [7]:
def create_windows_labels(time_series, window_size=7, horizon=1):
    windows = []
    labels = []

    for i in range(len(time_series) - window_size - horizon + 1):
        windows.append(time_series[i:i+window_size])
        labels.append(time_series[i+window_size:i+window_size+horizon])

    return np.array(windows), np.array(labels)


windows, labels = create_windows_labels(bitcoin_prices, window_size=7, horizon=1)


print(f"Windows shape: {windows.shape}")
print(f"Labels shape: {labels.shape}")

Windows shape: (1820, 7, 1)
Labels shape: (1820, 1, 1)


**Time-Based Train/Test Split:**

In [10]:
split_date = '2023-01-01'
train_data = data[data.index < split_date]
test_data = data[data.index >= split_date]

train_windows, train_labels = create_windows_labels(train_data.values, window_size=7)
test_windows, test_labels = create_windows_labels(test_data.values, window_size=7)

**Building Time Series Models**

- Model 0: Naive Forecast (Baseline):

In [11]:
def naive_forecast(data, horizon=1):
    return data[-horizon:]

naive_pred = naive_forecast(train_data.values, horizon=len(test_data))

- Model 1: Dense Model:

In [13]:
model_1 = keras.Sequential([
    layers.Dense(128, activation='relu', input_shape=(7,)),
    layers.Dense(64, activation='relu'),
    layers.Dense(1)
])

model_1.compile(
    optimizer='adam',
    loss='mae',
    metrics=['mae', 'mse']
)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


- Model 2: LSTM for Time Series:

In [14]:
model_2 = keras.Sequential([
    layers.LSTM(64, activation='relu', input_shape=(7, 1), return_sequences=True),
    layers.LSTM(32, activation='relu'),
    layers.Dense(1)
])

# Reshape data for LSTM (needs 3D: samples, timesteps, features)
train_windows_lstm = train_windows.reshape(-1, 7, 1)
test_windows_lstm = test_windows.reshape(-1, 7, 1)

  super().__init__(**kwargs)


- Model 3: Conv1D for Time Series:

In [17]:
model_3 = keras.Sequential([
    layers.Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(7, 1)),
    layers.Conv1D(filters=32, kernel_size=3, activation='relu'),
    layers.GlobalMaxPooling1D(),
    layers.Dense(1)
])

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


- Model 4: Multivariate Time Series:

In [19]:
multivariate_data = np.column_stack([price, volume, sentiment])

def create_multivariate_windows(data, window_size=7, horizon=1):
    windows = []
    labels = []
    for i in range(len(data) - window_size - horizon + 1):
        windows.append(data[i:i+window_size])
        labels.append(data[i+window_size:i+window_size+horizon, 0])
    return np.array(windows), np.array(labels)



multivariate_windows, multivariate_labels = create_multivariate_windows(
    multivariate_data, window_size=7
)

In [20]:
model_4 = keras.Sequential([
    layers.LSTM(64, input_shape=(7, 3)),
    layers.Dense(1)
])

  super().__init__(**kwargs)


**Time Series Evaluation Metrics**

In [21]:
def evaluate_time_series_forecast(y_true, y_pred):
    mae = tf.keras.metrics.mean_absolute_error(y_true, y_pred).numpy()
    mse = tf.keras.metrics.mean_squared_error(y_true, y_pred).numpy()
    rmse = np.sqrt(mse)

    # Mean Absolute Percentage Error (MAPE)
    mape = tf.reduce_mean(tf.abs((y_true - y_pred) / y_true)) * 100

    # MASE = MAE / MAE of naive forecast
    naive_mae = tf.reduce_mean(tf.abs(y_true[1:] - y_true[:-1]))
    mase = mae / naive_mae

    return {
        'MAE': mae,
        'MSE': mse,
        'RMSE': rmse,
        'MAPE': mape,
        'MASE': mase
    }

#### Advanced: N-BEATS Algorithm

**N-BEATS (Neural Basis Expansion Analysis):**

In [22]:
class NBeatsBlock(layers.Layer):
    def __init__(self, input_size, theta_size, horizon, n_neurons, n_layers, **kwargs):
        super().__init__(**kwargs)
        self.input_size = input_size
        self.theta_size = theta_size
        self.horizon = horizon
        self.n_neurons = n_neurons

        self.hidden = [layers.Dense(n_neurons, activation='relu') for _ in range(n_layers)]
        self.theta_layer = layers.Dense(theta_size, activation='linear', name='theta')

    def call(self, inputs):
        x = inputs
        for layer in self.hidden:
            x = layer(x)
        theta = self.theta_layer(x)
        backcast, forecast = self.lambda_layer(theta)
        return backcast, forecast

    def lambda_layer(self, theta):
        backcast_basis = tf.ones([self.input_size, self.theta_size])
        forecast_basis = tf.ones([self.horizon, self.theta_size])

        backcast = tf.einsum('bp,pt->bt', theta, backcast_basis)
        forecast = tf.einsum('bp,pt->bt', theta, forecast_basis)
        return backcast, forecast

In [23]:
def build_nbeats_model(input_size, horizon, n_block=4):
    inputs = layers.Input(shape=(input_size,))
    residuals = inputs
    forecasts = []

    for i in range(n_block):
        block = NBeatsBlock(
            input_size=input_size,
            theta_size=input_size + horizon,
            horizon=horizon,
            n_neurons=512,
            n_layers=4
        )
        backcast, forecast = block(residuals)
        residuals = layers.Subtract()([residuals, backcast])
        forecasts.append(forecast)

    forecast = layers.Add()(forecasts)
    model = keras.Model(inputs, forecast)
    return model

**Ensemble Models for Time Series**

In [24]:
models = [model_1, model_2, model_3]

predictions = []
for model in models:
    pred = model.predict(test_windows)
    predictions.append(pred)

# Ensemble: Average predictions
ensemble_pred = np.mean(predictions, axis=0)


# Or weighted ensemble
weights = [0.3, 0.4, 0.3]
ensemble_pred = np.average(predictions, axis=0, weights=weights)

[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 26ms/step
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step


**Prediction Intervals**

In [25]:
def get_prediction_intervals(predictions, confidence=0.95):
    alpha = 1 - confidence
    lower_percentile = (alpha / 2) * 100
    upper_percentile = (1 - alpha / 2) * 100

    lower_bound = np.percentile(predictions, lower_percentile, axis=0)
    upper_bound = np.percentile(predictions, upper_percentile, axis=0)

    return lower_bound, upper_bound


lower, upper = get_prediction_intervals(ensemble_pred, confidence=0.95)