In [1]:
# install packages
#pip install yfinance
#pip install scikit-optimize
#pip install ta

# Importing the required libraries
import yfinance as yf
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import warnings
warnings.filterwarnings("ignore", message="The objective has been evaluated at point")

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import MinMaxScaler

from skopt import gp_minimize
from skopt.space import Integer
from statsmodels.tsa.stattools import coint

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

from ta.momentum import RSIIndicator

from datetime import date, datetime 

In [2]:
# Define function to download stock data
def download_stock_data(stock_ticker, start_date):
    todays_date = date.today()
    stock_data = yf.download(stock_ticker, start=start_date, end=todays_date)
    return stock_data

# Define function to evaluate model (using Moving Averages)
def evaluate_model(ma_short, ma_long, stock_data):
    stock_data['MA_Short'] = stock_data['Close'].rolling(window=ma_short).mean()
    stock_data['MA_Long'] = stock_data['Close'].rolling(window=ma_long).mean()
    
    # Drop rows with NaN values after calculating the moving averages
    stock_data.dropna(subset=['MA_Short', 'MA_Long'], inplace=True)
    
    # Generate Buy and Sell Signals
    stock_data['Signal'] = np.where(stock_data['MA_Short'] > stock_data['MA_Long'], 1, -1)
    # Calculate Mean Absolute Error (MAE) based on signals for simplicity (customize as needed)
    mae = np.mean(np.abs(stock_data['Signal'].diff()))  # Example metric
    return mae, 0  # Return dummy RMSE (not calculated here)

# Define function for Bayesian Optimization
def objective(params, stock_data):
    ma_short, ma_long = params
    mae, _ = evaluate_model(ma_short, ma_long, stock_data)
    return mae  # We aim to minimize MAE

# Append new stock price to the data
def append_current_stock_price(stock_data, new_price, new_date):
    new_row = pd.DataFrame({'Open': [np.nan], 'High': [np.nan], 'Low': [np.nan], 'Close': [new_price],
                            'Adj Close': [np.nan], 'Volume': [np.nan]}, index=[new_date])
    stock_data = pd.concat([stock_data, new_row])
    return stock_data

# Define trading performance
def evaluate_trading_performance(ma_short, ma_long, stock_data):
    # Calculate the moving averages
    stock_data['MA_Short'] = stock_data['Close'].rolling(window=ma_short).mean()
    stock_data['MA_Long'] = stock_data['Close'].rolling(window=ma_long).mean()
    
    # Drop rows with NaN values
    stock_data.dropna(subset=['MA_Short', 'MA_Long'], inplace=True)
    
    # Generate Buy/Sell signals
    stock_data['Signal'] = np.where(stock_data['MA_Short'] > stock_data['MA_Long'], 1, -1)
    stock_data['Position'] = stock_data['Signal'].diff()
    
    # Simulate the trading performance
    initial_cash = 100000  # Starting capital
    shares = 0
    cash = initial_cash
    portfolio_value = []
    
    for i in range(len(stock_data)):
        if stock_data['Position'].iloc[i] == 2:  # Buy signal
           shares = cash // stock_data['Close'].iloc[i]
           cash -= shares * stock_data['Close'].iloc[i]
        
        elif stock_data['Position'].iloc[i] == -2:  # Sell signal
           cash += shares * stock_data['Close'].iloc[i]
           shares = 0
        
        # Track portfolio value (cash + shares value)
        portfolio_value.append(cash + shares * stock_data['Close'].iloc[i])
    
    # Calculate total return
    final_portfolio_value = portfolio_value[-1]
    total_return = final_portfolio_value - initial_cash
    
    # We return the negative of the total return since gp_minimize performs minimization
    return -total_return

# Define the Bayesian Optimization function
def perform_bayesian_optimization(stock_data, n_calls=50, random_state=42):
    # Define the search space
    search_space = [
        Integer(5, 25, name='ma_short'),  # Short-term moving average range
        Integer(50, 100, name='ma_long')  # Long-term moving average range
    ]

    # Perform Bayesian Optimization
    res = gp_minimize(lambda params: objective(params, stock_data.copy()), search_space, n_calls=n_calls, random_state=random_state)
   
    # Output the best parameters
    short_window = res.x[0]
    long_window = res.x[1]
    print(f"Best MA_Short: {short_window}, Best MA_Long: {long_window}, Best MAE: {res.fun}")
    return short_window, long_window, res

# Define the Bayesian Optimization function for returns
def perform_bayesian_optimization_for_returns(stock_data, n_calls=50, random_state=42):
    # Define the search space
    search_space = [
        Integer(5, 25, name='ma_short'),  # Short-term moving average range
        Integer(50, 100, name='ma_long')  # Long-term moving average range
    ]
   
    # Perform Bayesian Optimization
    res = gp_minimize(lambda params: evaluate_trading_performance(params[0], params[1], stock_data.copy()),
                      search_space, n_calls=n_calls, random_state=random_state)
   
    # Output the best parameters
    short_window = res.x[0]
    long_window = res.x[1]
    print(f"Best MA_Short: {short_window}, Best MA_Long: {long_window}, Best Total Return: {-res.fun}")
   
    return short_window, long_window, res

# Define find best performance windows
def find_best_performance_windows(stock_ticker, start_date):
    # Download stock data for any stock
    stock_data = download_stock_data(stock_ticker, start_date)
    
    # Perform Bayesian Optimization to maximize trading returns
    short_window, long_window, optimization_result = perform_bayesian_optimization_for_returns(stock_data, n_calls=50, random_state=42)
    
    return stock_data, short_window, long_window, optimization_result

# Define run backtest with interactive chart
def run_backtest_with_interactive_chart(stock_data, short_window, long_window, initial_cash=100000):
    # Step 1: Feature Engineering - Calculate Moving Averages
    stock_data['MA_Short'] = stock_data['Close'].rolling(window=short_window).mean()
    stock_data['MA_Long'] = stock_data['Close'].rolling(window=long_window).mean()
   
    # Step 2: Generate Buy and Sell Signals
    stock_data['Signal'] = 0
    stock_data['Signal'][short_window:] = np.where(stock_data['MA_Short'][short_window:] > stock_data['MA_Long'][short_window:], 1, -1)
   
    # Step 3: Calculate Buy and Sell Points
    stock_data['Position'] = stock_data['Signal'].diff()
   
    # Step 4: Backtest Strategy - Simulate Buy/Sell and Track Earnings
    shares = 0
    cash = initial_cash
    portfolio_value = []
    buy_signals = []
    sell_signals = []
    for i in range(len(stock_data)):
        if stock_data['Position'].iloc[i] == 2:  # Buy signal
            shares = cash // stock_data['Close'].iloc[i]  # Buy as many shares as possible
            cash -= shares * stock_data['Close'].iloc[i]
            buy_signals.append(stock_data['Close'].iloc[i])  # Record the buy price
            sell_signals.append(np.nan)  # No sell signal at this point
        elif stock_data['Position'].iloc[i] == -2:  # Sell signal
            cash += shares * stock_data['Close'].iloc[i]  # Sell all shares
            shares = 0  # Reset shares
            sell_signals.append(stock_data['Close'].iloc[i])  # Record the sell price
            buy_signals.append(np.nan)  # No buy signal at this point
        else:
            buy_signals.append(np.nan)  # No buy signal
            sell_signals.append(np.nan)  # No sell signal        
        # Track portfolio value (cash + shares value)
        portfolio_value.append(cash + shares * stock_data['Close'].iloc[i])
   
    # Step 5: Create an interactive chart for stock price, buy/sell signals, and portfolio value
    # Create the stock price chart
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=stock_data.index, y=stock_data['Close'], mode='lines', name='Stock Price', line=dict(color='blue')))
    fig.add_trace(go.Scatter(x=stock_data.index, y=stock_data['MA_Short'], mode='lines', name=f'{short_window}-day MA', line=dict(color='green', dash='dash')))
    fig.add_trace(go.Scatter(x=stock_data.index, y=stock_data['MA_Long'], mode='lines', name=f'{long_window}-day MA', line=dict(color='red', dash='dash')))
    fig.add_trace(go.Scatter(x=stock_data.index, y=buy_signals, mode='markers', name='Buy Signal', marker=dict(color='green', symbol='triangle-up', size=10)))
    fig.add_trace(go.Scatter(x=stock_data.index, y=sell_signals, mode='markers', name='Sell Signal', marker=dict(color='red', symbol='triangle-down', size=10)))
    # Update the layout
    fig.update_layout(
        title="Stock Price with Buy/Sell Signals",
        xaxis_title="Date",
        yaxis_title="Stock Price ($)",
        legend_title="Legend",
        height=600
    )
   # Create the portfolio value chart
    fig_portfolio = go.Figure()
    fig_portfolio.add_trace(go.Scatter(x=stock_data.index, y=portfolio_value, mode='lines', name='Portfolio Value', line=dict(color='purple')))
    fig_portfolio.update_layout(
        title="Portfolio Value Over Time",
        xaxis_title="Date",
        yaxis_title="Portfolio Value ($)",
        legend_title="Legend",
        height=600
    )

    # Step 6: Show both charts
    fig.show()
    fig_portfolio.show()
   
    # Step 7: Calculate final portfolio value and total earnings
    final_value = portfolio_value[-1]
    total_earnings = final_value - initial_cash
    # Print final results
    print(f"Final Portfolio Value: ${final_value:.2f}")
    print(f"Total Earnings: ${total_earnings:.2f}")
    
    return final_value, total_earnings

##### 1. Windows for Best Returns

In [3]:
stock_ticker = 'AAPL'
start_date = '2023-01-01'
stock_data, short_window, long_window, optimization_result = find_best_performance_windows(stock_ticker, start_date)
final_value, total_earnings = run_backtest_with_interactive_chart(stock_data, short_window, long_window)

[*********************100%***********************]  1 of 1 completed


Best MA_Short: 25, Best MA_Long: 50, Best Total Return: 33607.53486633301


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  stock_data['Signal'][short_window:] = np.where(stock_data['MA_Short'][short_window:] > stock_data['MA_Long'][short_window:], 1, -1)


Final Portfolio Value: $152347.20
Total Earnings: $52347.20
