In [1]:
from datetime import datetime, timedelta
import itertools
import polars as pl
import pandas as pd
import plotly.graph_objects as go
import swifter
import plotly.express as px
from alpaca.data.requests import StockBarsRequest
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit



  from .autonotebook import tqdm as notebook_tqdm


In [2]:

# Using API Key and Secret Key for authentication
client = StockHistoricalDataClient(api_key='PKLSQ5830Y23KPB8VOB9', secret_key='Yh3AbKig0kbjpn9IbbCNP5fYcuACmTkyqdWGngr8')




In [5]:
symbol = 'NVDA'
timeframe = TimeFrame(1, TimeFrameUnit.Hour)
# use today as end date, format %Y-%m-%d
End_Date = datetime.today()
# use half a year ago as start date
Start_Date = End_Date - timedelta(days=180)


In [6]:
request_params = StockBarsRequest(
                        symbol_or_symbols=[symbol],
                        timeframe=timeframe,
                        start=Start_Date,
                        end=End_Date,
                        adjustment='all'
                        )

In [7]:
stock_bars = client.get_stock_bars(request_params)


In [8]:
stock_df = stock_bars.df


In [30]:
stock_df.to_parquet(r'E:\git_repos\stock_analysis\data\raw_stock_price\test_nvda.parquet')


In [9]:
def calculate_moving_averages(data, short_window, long_window):
    data['Short_MA'] = data['close'].rolling(window=int(short_window)).mean()
    data['Long_MA'] = data['close'].rolling(window=int(long_window)).mean()
    return data


def simulate_trading(data):
    cash = 100000  # Starting cash
    shares_held = 0  # Starting shares
    simulation_results = []  # List to store simulation results at each step

    asset_values = []  # List to store total asset value at each step
    buy_points = []  # List to store buy points
    sell_points = []  # List to store sell points

    # Calculate the normalization factor
    normalization_factor = cash / data['close'].iloc[0]

    # Precompute the conditions for buying and selling
    buy_condition = (data['Short_MA'] > data['Long_MA']) & (data['Short_MA'].shift(1) <= data['Long_MA'].shift(1))
    sell_condition = (data['Short_MA'] < data['Long_MA']) & (data['Short_MA'].shift(1) >= data['Long_MA'].shift(1))

    for i in range(len(data)):
        # Update asset value
        asset_value = cash + shares_held * data['close'].iloc[i]
        asset_values.append(asset_value)

        # Check if the short moving average crosses above the long moving average
        if buy_condition.iloc[i]:
            shares_to_buy = cash // data['close'].iloc[i]
            cash -= shares_to_buy * data['close'].iloc[i]
            shares_held += shares_to_buy
            buy_points.append(asset_value)
            sell_points.append(None)
        # Check if the short moving average crosses below the long moving average
        elif sell_condition.iloc[i]:
            cash += shares_held * data['close'].iloc[i]
            shares_held = 0
            sell_points.append(asset_value)
            buy_points.append(None)
        else:
            buy_points.append(None)
            sell_points.append(None)
        
        # Update simulation results
        simulation_results.append({
            'Date': data.index[i],
            'Cash': cash,
            'Shares Held': shares_held,
            'Asset Value': cash + shares_held * data['close'].iloc[i],
            'Buy Point': buy_condition.iloc[i],
            'Sell Point': sell_condition.iloc[i],
            'Normalized Stock Price': data['close'].iloc[i] * normalization_factor  # Add normalized stock price
        })

    # Create DataFrame
    simulation_df = pd.DataFrame(simulation_results)
    simulation_df.set_index('Date', inplace=True)

    return simulation_df




# Now you can call this function within your test_strategy function

def preprocess_dataframe(df):
    # Split the index into two columns
    df['Symbol'], df['Date'] = zip(*df.index)
    
    # Convert the 'Date' column to datetime and set it as the index
    df['Date'] = pd.to_datetime(df['Date'])
    df.set_index('Date', inplace=True)
    
    return df



In [10]:
def test_strategy(args, data):
    symbol, timeframe, short_window, long_window = args
    data = calculate_moving_averages(data.copy(), short_window, long_window)  # Create a copy to avoid modifying the original data
    simulation_df = simulate_trading(data)
    return simulation_df['Asset Value'].iloc[-1]  # Return the final asset value


In [11]:
def sensitivity_analysis(data_df, short_range, long_range):
    
   
    results_df = pd.DataFrame(index=short_range, columns=long_range)
    
    # Create an iterable for all the combinations of short_window and long_window
    combinations = list(itertools.product(short_range, long_range))  # Convert to list
    
    # Convert the combinations iterable to a DataFrame for use with swifter
    combinations_df = pd.DataFrame(combinations, columns=['short_window', 'long_window'])
    
    # Define a function to apply to each row of the combinations_df DataFrame
    def compute_final_asset_value(row):
        return test_strategy((symbol, timeframe, row['short_window'], row['long_window']), data_df)
    
    # Use swifter to apply the function in parallel
    final_asset_values = combinations_df.swifter.apply(compute_final_asset_value, axis=1)
    
    # Populate the results_df DataFrame with the final asset values
    for i, (short_window, long_window) in enumerate(combinations):
        results_df.at[short_window, long_window] = final_asset_values[i]
    
    return results_df


In [12]:

def analyze_and_visualize(df):
    # Finding the Highest Value and its Indices
    highest_value = df.values.max()
    index_of_highest_value = df.stack().idxmax()  # Returns a tuple of indices (row, column)

    # Calculating the Mean
    mean_value = df.values.mean()

    # Calculating the Median
    median_value = df.median().median()

    # Calculating the Percentage Difference
    percentage_difference = ((highest_value - mean_value) / mean_value) * 100

    # Output the results
    print(f'Highest Value: {highest_value} at {index_of_highest_value}')
    print(f'Mean Value: {mean_value}')
    print(f'Median Value: {median_value}')
    print(f'Percentage Difference: {percentage_difference:.2f}%')

    # Visualizing the Data with a Heatmap using Plotly
    fig = px.imshow(df,
                    labels=dict(x="Long Window", y="Short Window", color="Final Asset Value"),
                    x=df.columns,
                    y=df.index,
                    title='Heatmap of Results'
                   )

    fig.update_xaxes(side="top")  # This moves the x-axis labels to the top of the heatmap for better readability
    fig.show()


In [13]:
short_range = range(1, 20)
long_range = range(14, 100)


In [14]:
results_df = sensitivity_analysis(stock_df, short_range, long_range)


Dask Apply: 100%|██████████| 32/32 [00:45<00:00,  1.44s/it]


In [15]:
# Hour freq
analyze_and_visualize(results_df)

Highest Value: 146553.66650000005 at (10, 16)
Mean Value: 122212.75238745412
Median Value: 122737.67184999997
Percentage Difference: 19.92%


In [13]:
# Day freq
analyze_and_visualize(results_df)


Highest Value: 148254.15 at (17, 16)
Mean Value: 105905.38347613234
Median Value: 109394.82
Percentage Difference: 39.99%


In [22]:
def calculate_moving_averages(data, timeframe):
    # Convert the timeframe to a string
    timeframe_str = str(timeframe)

    # Default values
    short_window = 10
    long_window = 16
    
    # Determine the number of trading hours in a day (assuming 6.5 hours)
    hours_per_day = 1
    # Determine the number of trading minutes in a day
    minutes_per_day = 1
    
    # Set the window size based on the timeframe
    if timeframe_str == '1Day':
        pass  # Use default values
    elif timeframe_str == '1Hour':
        short_window = 5 * hours_per_day
        long_window = 30 * hours_per_day
    elif timeframe_str == '1Min':
        short_window = 5 * minutes_per_day
        long_window = 30 * minutes_per_day
    else:
        print(f"Unexpected timeframe: {timeframe_str}. Using default window sizes.")
    
    data['Short_MA'] = data['close'].rolling(window=int(short_window)).mean()
    data['Long_MA'] = data['close'].rolling(window=int(long_window)).mean()
    return data


def simulate_trading(data):
    cash = 100000  # Starting cash
    shares_held = 0  # Starting shares
    simulation_results = []  # List to store simulation results at each step

    asset_values = []  # List to store total asset value at each step
    buy_points = []  # List to store buy points
    sell_points = []  # List to store sell points

    # Calculate the normalization factor
    normalization_factor = cash / data['close'].iloc[0]

    # Precompute the conditions for buying and selling
    buy_condition = (data['Short_MA'] > data['Long_MA']) & (data['Short_MA'].shift(1) <= data['Long_MA'].shift(1))
    sell_condition = (data['Short_MA'] < data['Long_MA']) & (data['Short_MA'].shift(1) >= data['Long_MA'].shift(1))

    for i in range(len(data)):
        # Update asset value
        asset_value = cash + shares_held * data['close'].iloc[i]
        asset_values.append(asset_value)

        # Check if the short moving average crosses above the long moving average
        if buy_condition.iloc[i]:
            shares_to_buy = cash // data['close'].iloc[i]
            cash -= shares_to_buy * data['close'].iloc[i]
            shares_held += shares_to_buy
            buy_points.append(asset_value)
            sell_points.append(None)
        # Check if the short moving average crosses below the long moving average
        elif sell_condition.iloc[i]:
            cash += shares_held * data['close'].iloc[i]
            shares_held = 0
            sell_points.append(asset_value)
            buy_points.append(None)
        else:
            buy_points.append(None)
            sell_points.append(None)
        
        # Update simulation results
        simulation_results.append({
            'Date': data.index[i],
            'Cash': cash,
            'Shares Held': shares_held,
            'Asset Value': cash + shares_held * data['close'].iloc[i],
            'Buy Point': buy_condition.iloc[i],
            'Sell Point': sell_condition.iloc[i],
            'Normalized Stock Price': data['close'].iloc[i] * normalization_factor  # Add normalized stock price
        })

    # Create DataFrame
    simulation_df = pd.DataFrame(simulation_results)
    simulation_df.set_index('Date', inplace=True)

    return simulation_df




# Now you can call this function within your test_strategy function

def preprocess_dataframe(df):
    # Split the index into two columns
    df['Symbol'], df['Date'] = zip(*df.index)
    
    # Convert the 'Date' column to datetime and set it as the index
    df['Date'] = pd.to_datetime(df['Date'])
    df.set_index('Date', inplace=True)
    
    return df

def test_strategy(symbol, timeframe):
    # Creating request object
    request_params = StockBarsRequest(
                        symbol_or_symbols=[symbol],
                        timeframe=timeframe,
                        start=Start_Date,
                        end=End_Date,
                        adjustment='all'
                        )
    # Retrieve bars for the stock in a DataFrame
    stock_bars = client.get_stock_bars(request_params)
    data = stock_bars.df
    data = calculate_moving_averages(data, timeframe)
    # Call simulate_trading function
    simulation_df = simulate_trading(data)
    
    # Store simulation DataFrame
    simulation_dfs[str(timeframe)] = preprocess_dataframe(simulation_df)


In [23]:
# Dictionary to store simulation DataFrames for each timeframe
simulation_dfs = {}

# Test the strategy on different timeframes
for timeframe in [TimeFrame.Day, TimeFrame.Hour, TimeFrame.Minute]:
    test_strategy('NVDA', timeframe)


In [24]:


def plot_simulation_results(df):
    # Preprocess the dataframe
    #df = preprocess_dataframe(df)
    
    # Create figure
    fig = go.Figure()
    
    # Trace for Asset Value
    trace1 = go.Scatter(
        x=df.index,
        y=df['Asset Value'],
        mode='lines',
        name='Asset Value'
    )
    
    # Trace for Buy Points
    trace2 = go.Scatter(
        x=df[df['Buy Point']].index,
        y=df[df['Buy Point']]['Asset Value'],
        mode='markers',
        name='Buy Point',
        marker=dict(
            symbol='triangle-up',
            size=10,
            color='green'
        )
    )
    
    # Trace for Sell Points
    trace3 = go.Scatter(
        x=df[df['Sell Point']].index,
        y=df[df['Sell Point']]['Asset Value'],
        mode='markers',
        name='Sell Point',
        marker=dict(
            symbol='triangle-down',
            size=10,
            color='red'
        )
    )
    
    # Trace for Normalized Stock Price
    trace4 = go.Scatter(
        x=df.index,
        y=df['Normalized Stock Price'],
        mode='lines',
        name='Normalized Stock Price'
    )
    
    # Add traces to figure
    fig.add_trace(trace1)
    fig.add_trace(trace2)
    fig.add_trace(trace3)
    fig.add_trace(trace4)
    
    # Update layout
    fig.update_layout(
        height=600,
        title_text="Trading Simulation",
        xaxis_title="Date",
        template='plotly_dark'
    )
    
    # Show plot
    fig.show()





In [25]:
simulation_dfs['1Day']

Unnamed: 0_level_0,Cash,Shares Held,Asset Value,Buy Point,Sell Point,Normalized Stock Price,Symbol
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2023-07-17 04:00:00+00:00,100000.00,0.0,100000.0000,False,False,100000.000000,NVDA
2023-07-18 04:00:00+00:00,100000.00,0.0,100000.0000,False,False,102223.753041,NVDA
2023-07-19 04:00:00+00:00,100000.00,0.0,100000.0000,False,False,101326.071513,NVDA
2023-07-20 04:00:00+00:00,100000.00,0.0,100000.0000,False,False,97974.296601,NVDA
2023-07-21 04:00:00+00:00,100000.00,0.0,100000.0000,False,False,95367.360558,NVDA
...,...,...,...,...,...,...,...
2024-01-05 05:00:00+00:00,87360.83,0.0,87360.8300,False,False,105691.774482,NVDA
2024-01-08 05:00:00+00:00,98.32,167.0,87360.8300,True,False,112485.738273,NVDA
2024-01-09 05:00:00+00:00,98.32,167.0,88842.1200,False,False,114395.195143,NVDA
2024-01-10 05:00:00+00:00,98.32,167.0,90862.8200,False,False,116999.978473,NVDA


In [27]:
plot_simulation_results(simulation_dfs['1Day'])
