In [235]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import (Dense, LSTM, Conv1D, MaxPooling1D, Flatten, Dropout, BatchNormalization, Layer, 
                        Bidirectional, MultiHeadAttention, LayerNormalization, Lambda)
from keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.regularizers import l2
import yfinance as yf
from datetime import datetime, timedelta, time
#import talib

import random

In [236]:
# Fetch Bitcoin data
data = yf.download('BTC-USD', start='2014-01-01', end=pd.Timestamp.today())

data = data.reset_index()  # Make Date a regular column
data.columns = data.columns.droplevel(1)  # Remove the Ticker level from column
data = data.rename(columns={'Close':'price', 'Date':'date', 'Volume':'volume'})  # Rename Price
btc_original = data.copy()

print(data[-1:])

[*********************100%***********************]  1 of 1 completed

Price       date          price        High         Low          Open  \
3925  2025-06-16  106796.757812  108915.375  104997.625  105555.59375   

Price       volume  
3925   50366626945  





In [237]:
def compute_rsi(series, window=14):
    delta = series.diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    
    avg_gain = gain.rolling(window=window).mean()
    avg_loss = loss.rolling(window=window).mean()
    
    rs = avg_gain / avg_loss
    return 100 - (100 / (1 + rs))

def compute_macd(series, slow=26, fast=12, signal=9):
    ema_fast = series.ewm(span=fast).mean()
    ema_slow = series.ewm(span=slow).mean()
    macd_line = ema_fast - ema_slow
    signal_line = macd_line.ewm(span=signal).mean()
    return macd_line - signal_line

In [238]:
# Calculate OBV
def calculate_obv(df):
    obv = [0]
    for i in range(1, len(df)):
        if df['price'].iloc[i] > df['price'].iloc[i-1]:
            obv.append(obv[-1] + df['volume'].iloc[i])
        elif df['price'].iloc[i] < df['price'].iloc[i-1]:
            obv.append(obv[-1] - df['volume'].iloc[i])
        else:
            obv.append(obv[-1])
    df['obv'] = obv
    return df

data = calculate_obv(data)

KeyError: 'Close'

In [None]:
#Add features
data['rsi'] = compute_rsi(data['price'], window=14)
data['macd'] = compute_macd(data['price'])

#On Balance Volume
#data['obv'] = talib.OBV(data['price'], data['volume'])

#Daily price % increase
data['daily_prt'] = data['price'].pct_change()

data['ma5'] = data['price'].rolling(5).mean()
data['ma5_prt'] = data['ma5'].pct_change()
data.tail(10)

In [None]:
data['date'] = data['date'].apply(lambda x : pd.to_datetime(x.date()))

In [None]:
data = data.dropna()
data.info()

In [None]:
data.head()

In [None]:
import requests
from io import StringIO

#BTC On chain metrics
#source: https://www.blockchain.com/en/explorer

#difficulty
url_diff = 'https://api.blockchain.info/charts/difficulty?timespan=all&rollingAverage=1days&start=2010-01-01&format=json'
url_hash = 'https://api.blockchain.info/charts/hash-rate?timespan=all&rollingAverage=1days&start=2010-01-01&format=json'

def get_chain_metrics(url, column_name):
    response = requests.get(url)
    chain_m = []

    if response.status_code == 200:
        data = response.json()
        chain_m = pd.DataFrame(data['values']).copy()
    
        # Convert and format columns
        chain_m['date'] = pd.to_datetime(chain_m['x'], unit='s')
        chain_m = chain_m[['x', 'y', 'date']] \
             .rename(columns={
                 'x': 'time_sec',
                 'y': column_name
             })
    
        #fill in between values
        # First, set the time_sec as the index to create a complete time series
        chain_m.set_index('time_sec', inplace=True)

        # calculate days to offset (data is often not of the same day as today)
        current_timestamp = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp()
        day_offset = int(current_timestamp) - chain_m.index.max()

        # Create a complete index with all seconds in the range (or whatever your interval should be)
        full_index = pd.RangeIndex(start=chain_m.index.min(), stop=chain_m.index.max() + 1, step=86400)  # daily data
        
        # Reindex to create missing values for all time points
        chain_m = chain_m.reindex(full_index)
        
        # Now interpolate the missing values
        chain_m[column_name] = chain_m[column_name].interpolate(method='linear')  # linear interpolation between points
        
        # For the date column, we can forward fill or create proper dates
        # and offset timestep so last entry matches with todays date
        chain_m['date'] = pd.to_datetime(chain_m.index + day_offset, unit='s')
        chain_m['date'] = chain_m['date'].apply(lambda x : pd.to_datetime(x.date()))
        
        # Reset index
        chain_m.reset_index(inplace=True)
        chain_m.rename(columns={'index': 'time_sec'}, inplace=True)
        chain_m = chain_m.drop(columns=['time_sec'])
        
        print(chain_m.head(10))
        return chain_m
        
    else:
        print(f"Error: {response.status_code}")

diff_df = get_chain_metrics(url_diff, 'difficulty')
hash_df = get_chain_metrics(url_hash, 'hash_rate')

print(diff_df.tail())
print(hash_df.tail())

In [None]:
#merge on chain metrics
data = data.merge(diff_df, on='date', how='outer')  #merge
data = data.merge(hash_df, on='date', how='outer')  #merge

In [None]:
data.tail(10)

In [None]:
data = data.dropna()
data.tail()

In [None]:
#Fear and Greed Index data
import requests
from io import StringIO

#source: https://alternative.me/crypto/fear-and-greed-index/
url = 'https://api.alternative.me/fng/?limit=0'
response = requests.get(url)

if response.status_code == 200:
    data_r = response.json()
    df_response = pd.DataFrame(data_r['data'])

    # Convert and format columns
    df_response['date'] = pd.to_datetime(df_response['timestamp'], unit='s')
    df_response = df_response[['value', 'value_classification', 'date']] \
         .rename(columns={
             'value': 'fng_value',
             'value_classification': 'classification'
         })
    
    print(df_response.head(10))
    
else:
    print(f"Error: {response.status_code}")

In [None]:
df_response['fng_value'] = df_response['fng_value'].apply(lambda x: float(x))
fng_df = df_response.copy()
fng_df.tail(10)

In [None]:
data_2018 = data.merge(fng_df, on='date', how='outer')  #merge
data_2018.head()

In [None]:
data_2018 = data_2018.dropna()
data_2018.head()

In [None]:
#Change main data
data = data_2018.copy()

In [None]:
data.tail()

In [None]:
features = ['price', 'rsi', 'macd', 'volume', 'obv', 'fng_value', 
            'daily_prt', 
            'difficulty', 'hash_rate',
            'ma5',
            'ma5_prt']
target = 'ma5_prt'

# Prepare data for time series prediction
# Normalize each feature separately
filtered_data = data[features].values

scalers = {}
for i in range(len(features)):
    scaler = MinMaxScaler(feature_range=(0, 1))
    filtered_data[:, i:i+1] = scaler.fit_transform(filtered_data[:, i:i+1])
    scalers[features[i]] = scaler

target_index = features.index(target)

#look_back = 70
look_back = 50
X, y = [], []
for i in range(look_back, len(data)):
    X.append(filtered_data[i-look_back:i, :])
    y.append(filtered_data[i, target_index])

X, y = np.array(X), np.array(y)
X = np.reshape(X, (X.shape[0], X.shape[1], len(features)))

print(X.shape)
print(y.shape)

In [None]:
filtered_data.shape

In [None]:

# Create CNN-LSTM model
model = Sequential()

# CNN layers
model.add(Conv1D(filters=256, kernel_size=2, activation='relu', input_shape=(look_back, len(features))))
model.add(MaxPooling1D(pool_size=2))
model.add(Conv1D(filters=256, kernel_size=1, activation='relu'))
model.add(MaxPooling1D(pool_size=2))
model.add(BatchNormalization())
model.add(Dropout(0.3))

# LSTM layers
model.add(LSTM(units=100, return_sequences=True))
model.add(Dropout(0.3))
model.add(LSTM(units=50))
model.add(Dropout(0.2))

# Attention layer
model.add(Dense(units=100, activation='tanh'))
model.add(Dropout(0.3))

# Dense layers
model.add(Dense(units=50, activation='relu'))
model.add(Dense(units=1))

model.compile(optimizer=Adam(learning_rate=0.004), loss='mean_squared_error')


In [None]:
# Main execution
test_size = 0.05  # Percentage of data to use for testing
#test_size = len(X) - look_back -1

# Split into train and test sets
split = int(len(X) * (1 - test_size))
#split = int(len(X) -1)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

In [None]:
epochs = 114
batch_size = 32

early_stop = EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False, verbose=1)

# Create and train model
history = model.fit(X_train, y_train, 
                    epochs=epochs, 
                    batch_size=batch_size, 
                    validation_data=(X_test, y_test),
                    #callbacks=[early_stop],
                    verbose=1)

In [None]:
# Make predictions
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)

print(f"y_test shape: {y_test.shape}")  # Should be (n_samples,)
print(f"Predictions shape: {test_predict.shape}")  # Should match y_test

In [None]:
def inverse_scaler(scaled_data, features, scalers):
    ### Inverse transform
    dummy_array = np.zeros((len(scaled_data), len(features)))
    dummy_array[:, target_index] = scaled_data.flatten()

    for i in range(len(features)):
        dummy_array[:, i] = scalers[features[i]].inverse_transform(dummy_array[:, i].reshape(-1, 1)).flatten()
        scaled_data = dummy_array[:, target_index]
    
    return scaled_data

train_predict = inverse_scaler(train_predict, features, scalers)
test_predict = inverse_scaler(test_predict, features, scalers)
y_train = inverse_scaler(y_train, features, scalers)
y_test = inverse_scaler(y_test, features, scalers)

In [None]:
# Calculate RMSE
train_rmse = np.sqrt(mean_squared_error(y_train, train_predict))
test_rmse = np.sqrt(mean_squared_error(y_test, test_predict))
print(f'Train RMSE: {train_rmse:.2f}')
print(f'Test RMSE: {test_rmse:.2f}')

In [None]:
print(train_predict[0:10])
print(test_predict[-10:])

In [None]:
# Plot results
plt.figure(figsize=(14, 7))

# Plot baseline and predictions
plt.plot(data['date'], data[target], label='Actual Price')
plt.plot(data[look_back:split + look_back]['date'], train_predict, label='Training Prediction')
plt.plot(data[split + look_back:]['date'], test_predict, label='Testing Prediction')
plt.title('Bitcoin Price Prediction using CNN-LSTM')
plt.xlabel('Date')
plt.ylabel('Price (USD)')
plt.legend()
plt.show()

In [None]:
# Plot results
plt.figure(figsize=(16, 8))

# Plot historical data
plt.plot(data['date'][-200:], data['volume'][-200:], label='Historical Price', color='blue')  # Last 500 days

# Formatting
plt.title('Bitcoin Price: History & 60-Day Prediction')
plt.xlabel('Date')
plt.ylabel('Price (USD)')
plt.legend()
plt.grid(True)

# Show the plot
plt.show()

In [None]:
data.tail()

In [None]:
data_future = data.copy()

In [None]:
print(features)
print(len(features))

In [None]:
# Prepare last sequence
last_sequence = data.iloc[-look_back:].copy()
scaled_sequence = last_sequence.copy()

# Scale each feature
for i, column in enumerate(features):
    scaled_sequence[column] = scalers[column].transform(last_sequence[column].values.reshape(-1, 1))

predictions = []
current_sequence = scaled_sequence[features].values.copy()

days_to_predict = 90

for i in range(days_to_predict):
    # Reshape for prediction
    x_input = current_sequence.reshape((1, look_back, len(features)))
    
    # Make prediction
    predicted_price_scaled = model.predict(x_input, verbose=1)
    
    # Create new row with predicted close and forecasted indicators
    new_row = current_sequence[-1].copy()
    new_row[0] = predicted_price_scaled[0, 0]  # Update price
    
    # Inverse transform just the price
    predicted_price = scalers[target].inverse_transform(predicted_price_scaled)[0, 0]
    last_price = data_future.loc[data_future.index[-1], 'price']
    predictions.append((1 + predicted_price) * last_price)

    #Add price to data
    predicted_row = {'price': (1 + predicted_price) * last_price, 
                     'rsi': 0, 'macd': 0, 'volume': 0, 'obv': 0, 
                     'fng_value': 0, 'difficulty': 0, 'hash_rate': 0,
                     'ma5': 0, 
                     'daily_prt': predicted_price}
    data_future.loc[len(data_future)] = predicted_row

    #Recalculate metrics
    data_future['rsi'] = compute_rsi(data_future['price'], window=14)
    data_future['macd'] = compute_macd(data_future['price'])

    #Volume
    data_future.loc[data_future.index[-1], 'volume'] = data_future['volume'].iloc[-1 + i] * random.uniform(-.02, 0.04)
    
    #On Balance Volume
    data_future = calculate_obv(data_future)
    #data_future['obv'] = talib.OBV(data_future['price'], data_future['volume'])

    #Fear and Greed
    data_future.loc[data_future.index[-1], 'fng_value'] = data_future['fng_value'].rolling(window=7).mean().iloc[-1]

    #Difficulty
    data_future.loc[data_future.index[-1], 'difficulty'] = data_future['difficulty'].iloc[-2] * 1.006

    #Hash Rate
    data_future.loc[data_future.index[-1], 'hash_rate'] = data_future['hash_rate'].iloc[-2]

    #Moving Average 3days
    data_future.loc[data_future.index[-1], 'ma5'] = data_future['price'].rolling(window=7).mean().iloc[-1]

    #Daily Pcrt Gain
    #data_future.loc[data_future.index[-1], 'daily_prt'] = data_future['hash_rate'].iloc[-2]

    #Scale last entry and add to new row sequence
    new_row[1] = scalers['rsi'].transform([[data_future.loc[data_future.index[-1], 'rsi']]])[0][0] #RSI
    new_row[2] = scalers['macd'].transform([[data_future.loc[data_future.index[-1], 'macd']]])[0][0]  #MACD
    new_row[3] = scalers['volume'].transform([[data_future.loc[data_future.index[-1], 'volume']]])[0][0]  #Volume
    new_row[4] = scalers['obv'].transform([[data_future.loc[data_future.index[-1], 'obv']]])[0][0]  #OBV
    new_row[5] = scalers['fng_value'].transform([[data_future.loc[data_future.index[-1], 'fng_value']]])[0][0]  #FNG
    new_row[6] = scalers['difficulty'].transform([[data_future.loc[data_future.index[-1], 'difficulty']]])[0][0]  #Difficulty
    new_row[7] = scalers['hash_rate'].transform([[data_future.loc[data_future.index[-1], 'hash_rate']]])[0][0]  #Hash Rate
    new_row[6] = scalers['ma5'].transform([[data_future.loc[data_future.index[-1], 'ma5']]])[0][0]  #MA3
    new_row[7] = scalers['daily_prt'].transform([[data_future.loc[data_future.index[-1], 'daily_prt']]])[0][0]  #Daily %

    # Update sequence
    current_sequence = np.vstack([current_sequence[1:], new_row])

# Create dates for predictions
last_date = pd.to_datetime(data.index[-1])  # Convert to pandas Timestamp
prediction_dates = [last_date + pd.Timedelta(days=i) for i in range(1, days_to_predict+1)]

In [None]:
#print(future_df.head())
last_date = data['date'][-1:]
#print(data['date'][-1:] + pd.Timedelta(days=3))

#prediction_dates = [last_date + pd.Timedelta(days=i) for i in range(1, days_to_predict+1)]
#print(predictions)

In [None]:
timestamp_day = 86400

In [None]:
last_date_timestamp = pd.to_datetime(last_date.values[0]).timestamp()

#last_date_timestamp + half a day + a day = next day
prediction_dates = [last_date_timestamp + 43200 + 86400 * i for i in range(1, days_to_predict+1)]
print(prediction_dates)

prediction_dates = [datetime.fromtimestamp(i).date() for i in prediction_dates]
print(prediction_dates)

In [None]:
datetime.fromtimestamp(last_date_timestamp)

In [None]:
prediction_dates[0].isoformat()

In [None]:
#datetime.date(prediction_dates[0].values[0])
print(datetime.timestamp(datetime.now()))
print(datetime.timestamp(datetime.now()) % timestamp_day)
print(datetime.timestamp(datetime.now()) - (datetime.timestamp(datetime.now()) % timestamp_day))
print('\n')
print(datetime.timestamp(datetime(2025, 6, 12)))
print(datetime.timestamp(datetime(2025, 6, 13)))
print(datetime.timestamp(datetime(2025, 6, 14)))
print(datetime.timestamp(datetime(2025, 6, 15)))
print('\n')
print(datetime.timestamp(datetime(2025, 6, 15)) - datetime.timestamp(datetime(2025, 6, 14)))

In [None]:
# Create DataFrame for future predictions
future_df = pd.DataFrame({
    'Date': prediction_dates,
    'Predicted_Price': predictions
})
future_df.set_index('Date', inplace=True)

# Plot results
plt.figure(figsize=(16, 8))

# Plot historical data
plt.plot(btc_original['date'][-500:], btc_original['price'][-500:], label='Historical Price', color='blue')  # Last 500 days

# Plot future predictions
plt.plot(prediction_dates, predictions, label='60-Day Prediction', color='red', linestyle='--')

# Formatting
plt.title('Bitcoin Price: History & 60-Day Prediction')
plt.xlabel('Date')
plt.ylabel('Price (USD)')
plt.legend()
plt.grid(True)

# Show the plot
plt.show()

# Print the predictions
print("\nPredicted Bitcoin Prices for the Next 60 Days:")
print(future_df.head(10))  # Show first 10 days of prediction

In [None]:
future_df = future_df.reset_index()
future_df

In [None]:
#future_df = future_df.reset_index()
#future_df = future_df.rename(columns={'Date':'date', 'Predicted_price':'price'})  # Rename Price
future_df = future_df[['Date', 'Predicted_Price']]

future_df

In [None]:
future_df.to_csv('lstm_cnn_btc_price.csv', index=False)