**Comprehensive Single Stock Market Analysis and Prediction System**

In [2]:
"""
Disclaimer
The content and functionalities of this project are for educational purposes only and are not intended to provide financial advice.
The creator of this project is not a financial advisor, and the information presented in this project should not be considered as financial advice or investment recommendations.
Users should conduct their own research and consult with a qualified professional before making any financial decisions. The creator of this project bears no responsibility for any financial losses incurred as a result of using this project.
"""

'\nDisclaimer\nThe content and functionalities of this project are for educational purposes only and are not intended to provide financial advice. \nThe creator of this project is not a financial advisor, and the information presented in this project should not be considered as financial advice or investment recommendations. \nUsers should conduct their own research and consult with a qualified professional before making any financial decisions. The creator of this project bears no responsibility for any financial losses incurred as a result of using this project.\n'

In [None]:
import numpy as np
import pandas as pd
import yfinance as yf

In [None]:
#return stock info using yfinance
def stock_output(stock):
  return yf.Ticker(stock)

In [None]:
def stock_df(stock, yf_stock):
  #pulls the stock interested in analyzing in 2 different data frames based off of 2 different data pulls then putting both data frames together

  df1 = pd.DataFrame(data=yf.download(stock, period='max'))
  df2 = pd.DataFrame(data=yf_stock.history(period='max'), columns=['Dividends'])

  df1.index = df1.index.tz_localize(None)
  df2.index = df2.index.tz_localize(None)

  stock_info = pd.merge(df1, df2, left_index=True, right_index=True)
  return stock_info

In [None]:
#input single stock for analysis
stock = 'AAPL'
stock_input = stock_output(stock)

In [None]:
stock_info = stock_df(stock, stock_input)

[*********************100%%**********************]  1 of 1 completed


In [None]:
#compares numbers with overall stock market, in this case S&P500
market_stock = '^GSPC'
market_input = stock_output(market_stock)
market_df1 = pd.DataFrame(data=yf.download(market_stock, period='max'))

[*********************100%%**********************]  1 of 1 completed


**Feature Engineering for Financial Analysis:** Create and analyze new features derived from stock data such as: moving averages, volatility, and momentum

In [None]:
def calculate_rolling_mean(df, column_name, window_size):
    """
    Calculate the rolling mean for a specified column in a DataFrame.

    Parameters:
    df (pd.DataFrame): The DataFrame containing the data.
    column_name (str): The name of the column for which to calculate the rolling mean.
    window_size (int): The number of periods to consider for the rolling mean.

    Returns:
    pd.Series: A Series containing the rolling mean values.

    Raises:
    KeyError: If the specified column name does not exist in the DataFrame.
    ValueError: If the window size is not a positive integer.
    """
    if column_name not in df.columns:
        raise KeyError(f"Column '{column_name}' does not exist in the DataFrame.")

    if not isinstance(window_size, int) or window_size <= 0:
        raise ValueError("window_size must be a positive integer.")

    return df[column_name].rolling(window=window_size).mean()

In [None]:
def calculate_ewm(df, column_name, span_size, adjust_type):
    """
    Calculate the Exponential Weighted Moving Average (EWMA) for a specified column in a DataFrame.

    Parameters:
    df (pd.DataFrame): The DataFrame containing the data.
    column_name (str): The name of the column for which to calculate the EWMA.
    span_size (int): The span size for the EWMA.
    adjust_type (bool): Whether to adjust during the calculation.

    Returns:
    pd.Series: A Series containing the EWMA values.

    Raises:
    KeyError: If the specified column name does not exist in the DataFrame.
    ValueError: If the span size is not a positive integer.
    TypeError: If adjust_type is not a boolean.
    """
    if column_name not in df.columns:
        raise KeyError(f"Column '{column_name}' does not exist in the DataFrame.")

    if not isinstance(span_size, int) or span_size <= 0:
        raise ValueError("span_size must be a positive integer.")

    if not isinstance(adjust_type, bool):
        raise TypeError("adjust_type must be a boolean value.")

    return df[column_name].ewm(span=span_size, adjust=adjust_type).mean()


In [None]:
def calculate_volatility(df, column_name, window_size):
    """
    Calculate the volatility for a specified column in a DataFrame.

    Parameters:
    df (pd.DataFrame): The DataFrame containing the data.
    column_name (str): The name of the column for which to calculate the volatility.
    window_size (int): The number of periods to consider for the volatility.

    Returns:
    pd.Series: A Series containing the volatility values.

    Raises:
    KeyError: If the specified column name does not exist in the DataFrame.
    ValueError: If the window size is not a positive integer.
    """

    if column_name not in df.columns:
      raise KeyError(f"Column '{column_name}' does not exist in the DataFrame.")

    if not isinstance(window_size, int) or window_size <= 0:
      raise ValueError("window_size must be a positive integer.")

    return df[column_name].pct_change().rolling(window=window_size).std()*np.sqrt(window_size)

In [None]:
def calculate_rsi(df, column_name, window_size):
    """
    Calculate the Relative Strength Index (RSI) for a specified column in a DataFrame.

    The RSI is a momentum oscillator that measures the speed and change of price movements.

    Parameters:
    df (pd.DataFrame): The DataFrame containing the data.
    column_name (str): The name of the column for which to calculate the RSI.
    window_size (int): The window size for calculating the RSI.

    Returns:
    pd.Series: A Series containing the RSI values.

    Raises:
    KeyError: If the specified column name does not exist in the DataFrame.
    ValueError: If the window size is not a positive integer.
    """

    if column_name not in df.columns:
        raise KeyError(f"Column '{column_name}' does not exist in the DataFrame.")

    if not isinstance(window_size, int) or window_size <= 0:
        raise ValueError("window_size must be a positive integer.")

    delta = df[column_name].diff()
    gains = delta.clip(lower=0)
    loss = -1*delta.clip(upper=0)
    avg_gain = gains.rolling(window=window_size, min_periods=window_size).mean()
    avg_loss = loss.rolling(window=window_size, min_periods=window_size).mean()

    rs = avg_gain/avg_loss
    rsi = 100 - (100/(1+rs))

    return rsi

In [None]:
def calculate_bollinger(df, column_name, window_size, num_std_dev):
    """
    Calculate Bollinger Bands for a specified column in a DataFrame.

    Bollinger Bands consist of a middle band being an N-period simple moving average (SMA),
    an upper band at K times an N-period standard deviation above the middle band,
    and a lower band at K times an N-period standard deviation below the middle band.

    Parameters:
    df (pd.DataFrame): The DataFrame containing the data.
    column_name (str): The name of the column for which to calculate Bollinger Bands.
    window_size (int): The window size for calculating the SMA and standard deviation.
    num_std_dev (float): The number of standard deviations to use for creating the bands.

    Returns:
    pd.DataFrame: A DataFrame containing the Bollinger Upper and Lower bands.

    Raises:
    KeyError: If the specified column name does not exist in the DataFrame.
    ValueError: If the window size is not a positive integer or if num_std_dev is not a positive number.
    """

    if column_name not in df.columns:
        raise KeyError(f"Column '{column_name}' does not exist in the DataFrame.")

    if not isinstance(window_size, int) or window_size <= 0:
        raise ValueError("window_size must be a positive integer.")

    if not isinstance(num_std_dev, (int, float)) or num_std_dev <= 0:
        raise ValueError("num_std_dev must be a positive number.")

    sma = df[column_name].rolling(window=window_size).mean()
    std = df[column_name].rolling(window=window_size).std()
    bollinger_upper = sma + (num_std_dev*std)
    bollinger_lower = sma - (num_std_dev*std)
    return pd.DataFrame(
      {
        'Bollinger_Upper': bollinger_upper,
        'Bollinger_Lower': bollinger_lower
      }
    )

In [None]:
def calculate_dividend_yield(df, dividend_col, close_col, window_size):
    """
    Calculate the rolling annual dividend and daily dividend yield for a specified stock.

    Parameters:
    df (pd.DataFrame): The DataFrame containing the stock data.
    dividend_col (str): The name of the column containing dividend data.
    close_col (str): The name of the column containing closing price data.
    window_size (int or str): The window size for calculating rolling sums.
                              Can be an integer (number of periods) or a string (time period).

    Returns:
    pd.DataFrame: A DataFrame containing the rolling annual dividend and daily dividend yield.

    Raises:
    KeyError: If the specified dividend or closing price column names do not exist in the DataFrame.

    """
    if dividend_col not in df.columns:
        raise KeyError(f"Column '{dividend_col}' does not exist in the DataFrame.")

    if close_col not in df.columns:
        raise KeyError(f"Column '{close_col}' does not exist in the DataFrame.")

    if isinstance(window_size, int):
      annual_dividend = df[dividend_col].rolling(window=window_size).sum()

    elif isinstance(window_size, str):
      annual_dividend = df[dividend_col].rolling(window=window_size, min_periods=1).sum()

    else:
      raise ValueError("window_size must be an integer or a string representing a time period.")

    daily_dividend = (annual_dividend/df[close_col])*100
    return pd.DataFrame({
        'Rolling Annual Dividend': annual_dividend,
        'Daily Dividend': daily_dividend
    }
    )

In [None]:
stock_info['SMA_50'] = calculate_rolling_mean(stock_info, 'Adj Close', 50)

In [None]:
stock_info['SMA_20'] = calculate_rolling_mean(stock_info, 'Adj Close', 20)

In [None]:
stock_info['EMA_50'] = calculate_ewm(stock_info, 'Adj Close', 50, False)

In [None]:
stock_info['Vol_MA_50'] = calculate_rolling_mean(stock_info, 'Volume', 50)

In [None]:
stock_info['Volatility'] = calculate_volatility(stock_info, 'Adj Close', 50)

In [None]:
stock_info['RSI'] = calculate_rsi(stock_info, 'Close', 14)

In [None]:
bollinger_data = calculate_bollinger(stock_info, 'Adj Close', 20, 2)
stock_info = pd.concat([stock_info, bollinger_data], axis=1)

In [None]:
stock_info['SMA_200'] = calculate_rolling_mean(stock_info, 'Adj Close', 20)

In [None]:
ema_26 = calculate_ewm(stock_info, 'Adj Close', 26, True)
ema_12 = calculate_ewm(stock_info, 'Adj Close', 12, True)

stock_info['MACD_Line'] = ema_12 - ema_26
stock_info['Signal Line'] = calculate_ewm(stock_info, 'MACD_Line', 9, True)

In [None]:
dividend_data = calculate_dividend_yield(stock_info, 'Dividends', 'Close', '365D')
stock_info = pd.concat([stock_info, dividend_data], axis=1)

In [None]:
#put data into quarterly information
stock_info_qtrly = stock_info.groupby(pd.Grouper(freq='QS')).first()

In [None]:
#Quarterly Returns

# Resample to get the price at the start of each quarter
quarterly_start = stock_info['Adj Close'].resample('QS').first()
# Resample to get the price at the end of each quarter
quarterly_end = stock_info['Adj Close'].resample('Q').last()
quarterly_end = quarterly_end.groupby(pd.Grouper(freq='QS')).first()

# Calculate quarterly returns
quarterly_returns = (quarterly_end - quarterly_start) / quarterly_start
quarterly_returns

qtrly_returns_df = quarterly_returns.to_frame('Quarterly_Returns')

In [None]:
#final dataframe with all the columns we need before scoring
final_df = pd.merge(stock_info_qtrly, qtrly_returns_df, left_index=True, right_index=True, how='inner')

In [None]:
# Calculate daily returns
stock_info['Return_Stock'] = stock_info['Adj Close'].pct_change()
market_df1['Return_Market'] = market_df1['Adj Close'].pct_change()

# Merge the return data
aligned_data = pd.merge(stock_info[['Return_Stock']], market_df1[['Return_Market']],
                        left_index=True, right_index=True, how='inner',
                        suffixes=('_stock', '_market'))

# Calculate covariance and market variance
covariance = aligned_data['Return_Stock'].cov(aligned_data['Return_Market'])
market_variance = aligned_data['Return_Market'].var()

# Calculate Beta
beta = covariance / market_variance

print("covariance:", covariance)
print("market_variance", market_variance)
print("beta", beta)


covariance: 0.00015581583754240052
market_variance 0.0001279710891639725
beta 1.217586242020257


**Multi Stock Analysis:** a list of stock tickers to output a list of stocks that are oversold and potentially a good entry point utilizing many metrics such as Volume, Moving Averages, and RSI

In [None]:
#List stocks interested to see if it is currently a good opportunity to entry
stock_list = ['AAPL', 'AMZN', 'GOOGL', 'TGT', 'MSFT', 'SOFI']

In [None]:
stock_list_df = pd.DataFrame(columns=['Ticker', 'Close', 'Volume', 'Volume_MA_20', 'RSI'])

In [None]:
for i in stock_list:
  data = pd.DataFrame(data=yf.download(i, period='max'))
  data = data[['Close', 'Volume']]
  data['Ticker'] = i
  data['Volume_MA_20'] = calculate_rolling_mean(data, 'Volume', 20)
  data['Volume_Prev_Day'] = data['Volume'].shift(1)
  data['RSI'] = calculate_rsi(data, 'Close', 14)

  data = data[['Ticker', 'Close', 'Volume', 'Volume_MA_20', 'RSI']]
  stock_list_df = pd.concat([data, stock_list_df], axis=0)
#create method for this, update capability to update RSI, and Volume values

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


In [None]:
from datetime import datetime
import pytz
# Get the current time in UTC
current_time_utc = datetime.now(pytz.utc)

# Convert to Eastern Standard Time (EST)
est_timezone = pytz.timezone('US/Eastern')
current_time_est = current_time_utc.astimezone(est_timezone)

# Get today's date
today = datetime.today()

is_business_day = 0

# Check if today is a business day
if today in pd.bdate_range(start=today, end=today):
    is_business_day = 1

In [None]:
#if the current business day is over, then use the most recent day stock information, else use the days before
#idea is to just try and pull about 30 days worth of information
if current_time_est.hour > 16 and is_business_day == 1:
  max_date = stock_list_df.index.max()
else:
  max_date = stock_list_df.index.max()-pd.Timedelta(days=1)

start_date = max_date-pd.Timedelta(days=30)

stock_list_df = stock_list_df[stock_list_df.index >= start_date]

In [None]:
stock_list_df_final = stock_list_df.copy()

In [None]:
stock_list_df_recent = stock_list_df[stock_list_df.index==max_date].copy()

In [None]:
#Remember, the analysis provided here is for educational purposes only and should not be considered financial advice.

In [None]:
def classify_buying_opportunity(row):
    if row['RSI'] < 30:
        if row['Volume'] > row['Volume_MA_20']:
            return 'Excellent Buy Opportunity'
        else:
            return 'Good Buy Opportunity'
    elif row['RSI'] > 70:
        if row['Volume'] > row['Volume_MA_20']:
            return 'Slightly Bad Buy Opportunity'
        else:
            return 'Worst Buy Opportunity'
    else:
        if row['Volume'] > row['Volume_MA_20']:
            return 'Neutral Buy Opportunity, High Volume'
        else:
            return 'Neutral Buy Opportunity, Low Volume'


# Apply the function to each row
stock_list_df_recent['Buying Opportunity'] = stock_list_df_recent.apply(classify_buying_opportunity, axis=1)

In [None]:
stock_list_df_recent['Close'] = stock_list_df_recent['Close'].round(2)
stock_list_df_recent['RSI'] = stock_list_df_recent['RSI'].round(2)
stock_list_df_recent['Volume_MA_20'] = stock_list_df_recent['Volume'].astype(int)
stock_list_df_recent['Volume'] = stock_list_df_recent['Volume'].astype(int)

In [None]:
stock_list_df_final.index.rename('Date', inplace=True)
stock_list_df_final.set_index('Ticker', append=True, inplace=True)
stock_list_df_final = stock_list_df_final.swaplevel('Ticker', 'Date')

In [None]:
#Enter stock interested from the stock list picked earlier
stock = 'SOFI'

stock_df = stock_list_df_final.loc[stock].copy()

In [None]:
"""
Create two graphs:
1) volume vs volume MA 20: show how most recent close day volume is compared to the 30 day moving average
2) RSI graph: show the historical trend of RSI the last ~30 days
"""

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10,6), sharex=True)

ax1.plot(stock_df.index, stock_df['Volume'], label='Volume')
ax1.plot(stock_df.index, stock_df['Volume_MA_20'], label='Volume MA 20')
ax1.set_ylabel('Volume')
ax1.legend(loc='upper left')
ax1.set_xlabel('Date')


ax2.plot(stock_df.index, stock_df['RSI'], label='RSI')
ax2.set_ylabel('RSI')
ax2.axhline(y=70, color='red', linestyle='--')
ax2.axhline(y=30, color='green', linestyle='--')
ax2.legend(loc='upper left')
ax2.set_xlabel('Date')

plt.show()

<IPython.core.display.Javascript object>

**Historical Data Analysis and Visualization:** Visualize historical stock market data and provide insights of past trend and patterns

In [None]:
#Trend Analysis: Line graph of closing price with SMA and EMA
%matplotlib notebook
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

In [None]:
#trend analysis - closing price over time with SMA/EMA
#enter start_date you want
start_date = '2020-01-01'
#enter end_date you want
end_date = datetime.now()
date_index = pd.date_range(start_date, end_date, freq='QS')
trend_df = pd.DataFrame(data=final_df, index=date_index)
trend_df.index.name = 'Date'
trend_df.index = pd.to_datetime(trend_df.index)

In [None]:
def plot_line_chart(df, x_column, y_columns, title, xlabel, ylabel):
	"""
    Plot a chart with three subplots sharing the x-axis from a DataFrame.

    Parameters:
    df (pd.DataFrame): The DataFrame containing the data.
    col1 (str): Column name for the first plot.
    col2 (str): Column name for the second plot.
    col3 (str): Additional data for the second plot (usually for comparison or highlighting).
    col4 (str): Column name for the third plot.
    title (str): Overall title of the combined plot.
    xlabel (str): Label for the shared x-axis.

    Returns:
    None: This function displays the plot but does not return anything.
  """
	plt.figure(figsize=(10,6))
	sns.set_style("darkgrid")

	for y_column in y_columns:
		sns.lineplot(x=x_column, y=y_column, data=df, label=y_column)

	plt.title(title)
	plt.xlabel(xlabel)
	plt.ylabel(ylabel)
	plt.legend(loc='best')
	plt.gcf().autofmt_xdate()
	plt.show()

In [None]:
def two_axes_plot(df, col1, col2, title):
	"""
  Plot a dual-axis chart with bar and line plots from a DataFrame.

  Parameters:
  df (pd.DataFrame): The DataFrame containing the data.
  col1 (str): Column name for the bar plot (primary y-axis).
  col2 (str): Column name for the line plot (secondary y-axis).
  title (str): Title of the plot.

  Returns:
  None: This function displays the plot but does not return anything.
	"""
	fig, ax1 = plt.subplots(figsize=(10, 6))
	ax1.bar(df.index, df[col1], label=col1, color='#0E65A3', width=20)
	plt.gcf().autofmt_xdate()
	plt.xticks(fontsize=8)
	plt.title(title)
	ax1.set_ylabel(col1)
	ax1.set_xlabel('Date')
	ax1.legend(loc='upper left')
	plt.xticks(fontsize=8)

	ax2 = ax1.twinx()
	ax2.plot(df.index, df[col2], label=col2, color='#F79500')
	ax2.set_ylabel(col2)
	ax2.legend(loc='upper right')

	plt.show()

In [None]:
def three_axes_plot(df, col1, col2, col3, col4, title, xlabel):
    """
    Plot a chart with three subplots sharing the x-axis from a DataFrame.

    Parameters:
    df (pd.DataFrame): The DataFrame containing the data.
    col1 (str): Column name for the first plot.
    col2 (str): Column name for the second plot.
    col3 (str): Additional data for the second plot (usually for comparison or highlighting).
    col4 (str): Column name for the third plot.
    title (str): Overall title of the combined plot.
    xlabel (str): Label for the shared x-axis.

    Returns:
    None: This function displays the plot but does not return anything.
    """
    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(10,6), sharex=True)
    ax1.plot(df.index, df[col1], label=col1)
    ax1.set_ylabel(col1)
    ax1.legend(loc='upper left')

    ax2.plot(df.index, df[col2], label=col2)
    ax2.plot(df.index, df[col3], label=col3)
    ax2.axhline(y=0, color='black', linestyle='--')
    ax2.set_ylabel(col2)
    ax2.legend(loc='upper left')

    ax3.plot(df.index, df[col4], label=col4)
    ax3.set_ylabel(col4)
    ax3.axhline(y=70, color='red', linestyle='--')
    ax3.axhline(y=30, color='blue', linestyle='--')
    ax3.legend(loc='upper left')
    ax3.set_xlabel(xlabel)

    plt.suptitle(title)
    plt.gcf().autofmt_xdate()
    plt.tight_layout(rect=[0, 0, 1, 0.95])
    plt.show()


In [None]:
plot_line_chart(trend_df, 'Date', ['Close', 'SMA_50', 'EMA_50'], 'Stock Price Over Time', 'Date', 'Close')

<IPython.core.display.Javascript object>

In [None]:
two_axes_plot(trend_df, 'Volume', 'Close', 'Volume and Closing Price Over Time')

<IPython.core.display.Javascript object>

In [None]:
two_axes_plot(trend_df, 'Volume', 'Volatility', 'Volatility and Volume Over Time')

<IPython.core.display.Javascript object>

In [None]:
three_axes_plot(trend_df, 'Close', 'MACD_Line', 'Signal Line', 'RSI', 'Comprehensive Stock Trend Analysis: Price, MACD, and RSI', 'Date')

<IPython.core.display.Javascript object>

**Stock Scoring and Classification:** Create a scoring method based on many indicators to help with decision making for buying, selling or holding a particular stock.

In [None]:
def calculate_score_vectorized(df, score_column, conditions, scores):
    """
    Calculate scores for a DataFrame based on specified conditions and assign them to a new column.

    Parameters:
    df (pd.DataFrame): The DataFrame to operate on.
    score_column (str): The name of the new column where the scores will be stored.
    conditions (list): A list of boolean conditions based on which scores are assigned.
    scores (list): A list of scores corresponding to each condition in 'conditions'.

    Raises:
    TypeError: If 'scores' does not contain integers.
    ValueError: If the lengths of 'conditions' and 'scores' do not match.
    """

    if not all(isinstance(score, int) for score in scores):
        raise TypeError("All elements in 'scores' must be integers.")

    if len(conditions) != len(scores):
        raise ValueError("The length of 'conditions' and 'scores' must be the same.")

    df[score_column] = np.select(conditions, scores, default=np.nan)

In [None]:
def add_final_score(df, cols_copy, score_col, weight, metric_name, year):
    """
    Calculate normalized and weighted scores for a given metric and add them to a DataFrame.

    Parameters:
    df (pd.DataFrame): DataFrame containing the data.
    cols_copy (str): Column name to be used for scoring.
    score_col (str): Column name where the scores are stored.
    weight (float): Weight factor to apply to the final score.
    metric_name (str): Name of the metric for identification in the output.
    year (int or str): Year from which to consider the data for scoring.

    Returns:
    pd.DataFrame: A DataFrame with calculated metric name, final score, and weighted score.

    Raises:
    KeyError: If 'cols_copy' or 'score_col' is not in the DataFrame's columns.
    TypeError: If 'weight' is not a float
    ValueError: If 'weight' is not positive.
    """
    # Error checks
    if cols_copy not in df.columns:
        raise KeyError(f"Column '{cols_copy}' does not exist in the DataFrame.")

    if score_col not in df.columns:
        raise KeyError(f"Column '{score_col}' does not exist in the DataFrame.")

    if not isinstance(weight, float) or weight <= 0:
        raise ValueError("Weight must be a positive float.")

    df_final = df[[cols_copy]].copy()
    df_final.dropna(inplace=True)
    df_final = df_final[df_final.index >= year]
    #(value-min/ max-min)
    df_final['Normalized Score'] = (df_final[score_col] - df_final[score_col].min())/(df_final[score_col].max() - df_final[score_col].min())
    average_score = df_final['Normalized Score'].mean()
    weighted_score = average_score * weight

    return pd.DataFrame(
      [{'Metric': metric_name,
        'Final Score': average_score,
        'Weighted Score': weighted_score
        }
      ])

In [None]:
def concat_final_score(original_df, new_df, axis):
    """
    Concatenates two DataFrames along the index axis.

    Parameters:
    original_df (pd.DataFrame): The original DataFrame to concatenate.
    new_df (pd.DataFrame): The new DataFrame to be added.

    Returns:
    pd.DataFrame: The concatenated DataFrame.

    Raises:
    TypeError: If either 'original_df' or 'new_df' is not a DataFrame.
    """

    if not isinstance(original_df, pd.DataFrame):
        raise TypeError("original_df must be a pandas DataFrame.")

    if not isinstance(new_df, pd.DataFrame):
        raise TypeError("new_df must be a pandas DataFrame.")

    return pd.concat([original_df, new_df], axis=0, ignore_index=True)

In [None]:
def quantile_scoring(column):
    """
    Assigns scores to a pandas Series based on its quantiles.

    Parameters:
    column (pd.Series): A pandas Series for which scores are to be calculated.

    Returns:
    np.array: An array of scores based on quantile thresholds.
    """

    lower_quantile = column.quantile(.25)
    upper_quantile = column.quantile(.75)

    scores = np.select(
        [column > upper_quantile, column > lower_quantile, column <= lower_quantile],
        [10, 5, 0],
        default=np.nan
    )
    return scores

In [None]:
final_scores = pd.DataFrame(columns=['Metric', 'Final Score', 'Weighted Score'])

In [None]:
##Price Indicators (Close, Adj Close)
sorted_df = stock_info[['Close', 'Adj Close', 'SMA_20', 'SMA_50', 'SMA_200']]
sorted_df = sorted_df.sort_index(ascending=False).head(20)
sorted_df['SMA_Slope'] = (sorted_df['SMA_20'] - sorted_df['SMA_20'].shift(5))/5
sorted_df_cleaned = sorted_df[['SMA_Slope']].dropna()

strong_positive_threshold = np.percentile(sorted_df_cleaned, 80)
slight_positive_threshold = np.percentile(sorted_df_cleaned, 60)
neutral_threshold = np.percentile(sorted_df_cleaned, 40)
slight_negative_threshold = np.percentile(sorted_df_cleaned, 20)
strong_negative_threshold = np.percentile(sorted_df_cleaned, 0)

conditions = [
sorted_df_cleaned['SMA_Slope'] > strong_positive_threshold,
sorted_df_cleaned['SMA_Slope'] > slight_positive_threshold,
sorted_df_cleaned['SMA_Slope'] > neutral_threshold,
sorted_df_cleaned['SMA_Slope'] > slight_negative_threshold,
sorted_df_cleaned['SMA_Slope'] >= strong_negative_threshold
]

scores = [10,5,0,-5,-10]

calculate_score_vectorized(sorted_df_cleaned, 'SMA_Slope_Score', conditions, scores)

In [None]:
slope_df = add_final_score(sorted_df_cleaned, 'SMA_Slope_Score', 'SMA_Slope_Score', .15, 'Price Indicator', sorted_df_cleaned.index.min())

In [None]:
final_scores = concat_final_score(final_scores, slope_df,axis=0)

In [None]:
#Volume Indicator

"""
Strong Positive Signal: Current volume is more than 50% above the 'Vol_MA_50'.
Moderate Positive Signal: Current volume is 20-50% above the 'Vol_MA_50'.
Neutral: Current volume is around the 'Vol_MA_50' range.

"""

volume_df = stock_info[['Volume', 'Vol_MA_50']].copy()

volume_difference = volume_df['Volume'] - volume_df['Vol_MA_50']
strong_positive_threshold = volume_df['Vol_MA_50'] * 0.5
neutral_threshold = volume_df['Vol_MA_50'] * 0.2
strong_negative_threshold = -volume_df['Vol_MA_50'] * 0.2

conditions = [
volume_difference > strong_positive_threshold,
volume_difference > neutral_threshold,
volume_difference > strong_negative_threshold
]

scores=[10,5,0]

calculate_score_vectorized(volume_df, 'Volume Signal Score', conditions, scores)

In [None]:
volume_df_score = add_final_score(volume_df, 'Volume Signal Score', 'Volume Signal Score', .15, 'Volume Indicator', '2005')

In [None]:
final_scores = concat_final_score(final_scores, volume_df_score,axis=0)

In [None]:
#Trend Indicators
score_df = stock_info[['SMA_50', 'SMA_20', 'EMA_50', 'Close']].copy()

"""
Compare the current closing price ('Close') to SMA_50 and EMA_50.
If the price is above these averages, it typically indicates a bullish trend.
Conversely, if it's below, it indicates a bearish trend.
"""

def trend_scoring(row):
    score = 0
    if row['Close'] > row['SMA_50']:
        score += 3

    if row['Close'] > row['EMA_50']:
        score += 3

    if row['SMA_20'] > row['SMA_50']:
        score += 4

    return score

score_df['Trend_Score'] = score_df.apply(lambda row: trend_scoring(row), axis=1)

In [None]:
trend_score_df = add_final_score(score_df, 'Trend_Score', 'Trend_Score', .18, 'Trend Indicator', '2005')

In [None]:
final_scores = concat_final_score(final_scores, trend_score_df,axis=0)

In [None]:
volatility_df = stock_info[['Volatility']].copy()
volatility_df['Volatility_Score'] = quantile_scoring(volatility_df['Volatility'])
volatility_score_df = add_final_score(volatility_df, 'Volatility_Score', 'Volatility_Score', .08, 'Volatility Indicator',  '2005')

In [None]:
final_scores = concat_final_score(final_scores, volatility_score_df, axis=0)

In [None]:
#Momentum Indicators (RSI, MACD_Line, Signal Line)
"""
RSI (Relative Strength Index): Measures the speed and change of price movements.
Typically, an RSI above 70 indicates a stock may be overbought (potentially bearish signal),
while an RSI below 30 suggests it may be oversold (potentially bullish signal).

MACD Line: Shows the relationship between two moving averages of a stock’s price.
The MACD Line crossing above the Signal Line can be a bullish signal, while crossing below can be bearish.

Signal Line: A moving average of the MACD Line, used as a trigger for buy and sell signals.

RSI Scoring:
High Score: RSI between 30 and 70 (neutral or normal momentum).
Lower Score: RSI above 70 (overbought) or below 30 (oversold).
MACD Line and Signal Line Crossover Scoring:

Assign a positive score if the MACD Line is above the Signal Line (bullish momentum).
Assign a negative score if the MACD Line is below the Signal Line (bearish momentum).
"""

momentum_df = stock_info[['RSI', 'MACD_Line', 'Signal Line']].copy()

def momentum_scoring(row):
    score = 0
    if row['RSI'] >= 30 and row['RSI'] <= 70:
        score += 5
    elif row['RSI'] > 70:
        score -= 5
    if row['MACD_Line'] > row['Signal Line']:
        score += 5
    elif row['MACD_Line'] < row['Signal Line']:
        score -= 5

    return score

momentum_df['Momentum_Score'] = momentum_df.apply(lambda row: momentum_scoring(row), axis=1)

In [None]:
momentum_df_score = add_final_score(momentum_df, 'Momentum_Score', 'Momentum_Score', .18, 'Momentum Indicator', '2005')

In [None]:
final_scores = concat_final_score(final_scores, momentum_df_score, axis=0)

In [None]:
#Bollinger Bands (Upper, Lower)
bollinger_df = stock_info[['Bollinger_Upper', 'Bollinger_Lower', 'Close']].copy()

def bollinger_score(row):
    score = 0
    if row['Close'] >= row['Bollinger_Upper']:
        score += 10
    elif row['Close'] <= row['Bollinger_Lower']:
        score -= 10
    return score

bollinger_df['Bollinger_Score'] = bollinger_df.apply(lambda row: bollinger_score(row), axis=1)

In [None]:
bollinger_score_df = add_final_score(bollinger_df, 'Bollinger_Score', 'Bollinger_Score', .08, 'Bollinger Bands', bollinger_df.index.min())

In [None]:
final_scores = concat_final_score(final_scores, bollinger_score_df, axis=0)

In [None]:
#Dividend Indicators (Dividends, Annual Dividend, Dividend Yield)
annual_dividends = stock_info['Dividends'].resample('Y').sum()
annual_close_price = stock_info['Close'].resample('Y').last()

dividend_growth = annual_dividends.pct_change()
price_growth = annual_close_price.pct_change()

combined_growth = pd.concat([dividend_growth, price_growth], axis=1)
combined_growth.columns = ['Dividend_Growth', 'Price_Growth']

def dividend_score(row):
    score = 0
    if row['Dividend_Growth'] > 0 and row['Price_Growth'] > 0:
        score += 10
    elif row['Dividend_Growth'] < 0 and row['Price_Growth'] < 0:
        score -=10
    return score

combined_growth['Dividend_Score'] = combined_growth.apply(lambda row: dividend_score(row), axis=1)

In [None]:
combined_growth_final = combined_growth.copy()
combined_growth_final.dropna(inplace=True)
combined_growth_final.drop(columns=['Dividend_Growth', 'Price_Growth'], inplace=True)
combined_growth_final['Normalized_Score'] = (combined_growth_final['Dividend_Score'] - combined_growth_final['Dividend_Score'].min()) / (combined_growth_final['Dividend_Score'].max() - combined_growth_final['Dividend_Score'].min())
total_score = combined_growth_final['Dividend_Score'].sum()
average_dividend_score = combined_growth_final['Normalized_Score'].mean()

if not np.isnan(average_dividend_score) and total_score > 0:
  weighted_dividend_score = average_dividend_score * .05
elif np.isnan(average_dividend_score) and total_score > 0:
  average_dividend_score = 1
  weighted_dividend_score = average_dividend_score*.05
else:
  weighted_dividend_score = np.nan
#check if company offers dividends or not

dividend_score_df = pd.DataFrame(
        [
            {
                'Metric': 'Dividend Indicator',
                'Final Score': average_dividend_score,
                'Weighted Score': weighted_dividend_score
            }
        ]
    )

final_scores = concat_final_score(final_scores, dividend_score_df, axis=0)


In [None]:
#Performance Metrics (Quarterly Returns)

qtr_performance = stock_info['Adj Close'].resample('Q').last().copy()
qtr_growth = qtr_performance.pct_change()

qtr_perf_df = pd.concat([qtr_performance, qtr_growth], axis=1)
qtr_perf_df.columns = ['Quarter_Close', 'Quarterly_Growth']

high_threshold = qtr_perf_df['Quarterly_Growth'].quantile(.75)
med_threshold = qtr_perf_df['Quarterly_Growth'].quantile(.50)
low_threshold = qtr_perf_df['Quarterly_Growth'].quantile(.25)

def qtrly_score(row):
    score = 0
    if row['Quarterly_Growth'] >= high_threshold:
        score += 10
    elif row['Quarterly_Growth'] < high_threshold and row['Quarterly_Growth'] >= med_threshold:
        score += 5
    elif row['Quarterly_Growth'] < med_threshold and row['Quarterly_Growth'] >= low_threshold:
        score += 2
    return score

qtr_perf_df['Quarter_Score'] = qtr_perf_df.apply(lambda x: qtrly_score(x), axis=1)

In [None]:
qtr_score_df = add_final_score(qtr_perf_df, 'Quarter_Score', 'Quarter_Score', .10, 'Quarterly Performance Indicator', '2005')

In [None]:
final_scores = concat_final_score(final_scores, qtr_score_df, axis=0)

In [None]:
#Recent Earnings Data
stock_input.quarterly_income_stmt
earnings_col = [
    'Total Revenue',
    'Gross Profit',
    'Operating Income',
    'Net Income',
    'EBITDA',
    #'Diluted EPS',
    #'Basic EPS',
    'Pretax Income',
    'Tax Provision'
]

earnings_df = pd.DataFrame(stock_input.quarterly_income_stmt.T)
earnings_df.index = pd.to_datetime(earnings_df.index)

earnings_df = earnings_df[earnings_col].copy()

#add additional metrics for calculation
earnings_df['Total Revenue Growth'] = earnings_df['Total Revenue'].pct_change()
earnings_df['Gross Profit Margin'] = earnings_df['Gross Profit'] / earnings_df['Total Revenue']
earnings_df['Operating Income Margin'] = earnings_df['Operating Income'] / earnings_df['Total Revenue']
earnings_df['Net Income Margin'] = earnings_df['Net Income'] / earnings_df['Total Revenue']
earnings_df['EBITDA Margin'] = earnings_df['EBITDA'] / earnings_df['Total Revenue']
#earnings_df['EPS Growth Diluted'] = earnings_df['Diluted EPS'].pct_change()
#earnings_df['EPS Growth Basic'] = earnings_df['Basic EPS'].pct_change()
earnings_df['Pretax Income Margin'] = earnings_df['Pretax Income'] / earnings_df['Total Revenue']
earnings_df['Effective Tax Rate'] = earnings_df['Tax Provision'] / earnings_df['Pretax Income']

In [None]:
earning_score_df = earnings_df[['Total Revenue Growth', 'Gross Profit Margin', 'Operating Income Margin', 'Net Income Margin',
                   'EBITDA Margin', 'Pretax Income Margin', 'Effective Tax Rate']].copy()   #'EPS Growth Diluted', #'EPS Growth Basic',

for col in earning_score_df:
  earning_score_df[f'{col} Score'] = (quantile_scoring(earning_score_df[col]))


In [None]:
"""
Normalize score, if NaN take it out of the total when averaging out
each row, sum up the total score for each column
"""
for row in earning_score_df.index:
    total_score = 0
    total_metrics = 0
    for col in earning_score_df.columns:
        # Check if column name ends with 'Score' and is not 'Total_Score'
        if col.endswith('Score') and col != 'Total_Score':
            if not pd.isna(earning_score_df.loc[row, col]):
                total_score += earning_score_df.loc[row, col]
                total_metrics += 1

    earning_score_df.loc[row, 'Total_Score'] = total_score
    earning_score_df.loc[row, 'Total_Metrics'] = total_metrics

    if total_metrics == 0:
        earning_score_df.loc[row, 'Final_Scores'] = np.nan
    else:
        earning_score_df.loc[row, 'Final_Scores'] = total_score / total_metrics



In [None]:
earnings_score_df = add_final_score(earning_score_df, 'Final_Scores', 'Final_Scores', .10, 'Recent Earnings Data', earning_score_df.index.min())

In [None]:
final_scores = concat_final_score(final_scores, earnings_score_df, axis=0)

In [None]:
total_stock_score = final_scores['Final Score'].sum()
total_count = final_scores['Final Score'].count()
normalized_final_score = total_stock_score/total_count

print("Total Score", total_score)
print("Total Count", total_count)
print('Normalized Final Score', normalized_final_score)

Total Score 50.0
Total Count 9
Normalized Final Score 0.5694626735501381


In [None]:
#Remember, the analysis provided here is for educational purposes only and should not be considered financial advice.

In [None]:
"""
Strong Buy: If the score is in the top 20% of the range (e.g., > 0.8).
Buy: If the score is between the top 40% and 20% (e.g., > 0.6 and ≤ 0.8).
Hold: Middle 20% (e.g., > 0.4 and ≤ 0.6).
Sell: Lower 40% to 20% (e.g., > 0.2 and ≤ 0.4).
Strong Sell: Bottom 20% (e.g., ≤ 0.2).
"""

if normalized_final_score > .8:
  recommendation = 'Strong Buy'
elif normalized_final_score >.6 and normalized_final_score <=.8:
  recommendation = 'Buy'
elif normalized_final_score >.4 and normalized_final_score <=.6:
  recommendation = 'Hold'
elif normalized_final_score >.2 and normalized_final_score <=.4:
  recommendation = 'Sell'
elif normalized_final_score <= .2:
  recommendation = 'Strong Sell'

print(f"{stock} recommended action is to: {recommendation}")

AAPL recommended action is to: Hold


In [None]:
#Scores with weight applied, weighted score max is 1
final_scores

Unnamed: 0,Metric,Final Score,Weighted Score
0,Price Indicator,0.5,0.075
1,Volume Indicator,0.237987,0.035698
2,Trend Indicator,0.802691,0.144484
3,Volatility Indicator,0.256416,0.020513
4,Momentum Indicator,0.618871,0.111397
5,Bollinger Bands,0.906811,0.072545
6,Dividend Indicator,0.717391,0.03587
7,Quarterly Performance Indicator,0.44026,0.044026
8,Recent Earnings Data,0.644737,0.064474


**Machine Learning for Price Prediction:** Utilize PCA, Random Forest and Linear Regression for prediction stock prices



In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn import metrics

In [None]:
"""
X_train - features that are used to train the data
X_test - feature data that is used to test the data to see how accurate the model is
y_train - actual values that we use to train the values
y_test - actual values that we are using to evaluate how accurate the model is

train - using dataset test
test - predicting values to compare with actuals

test_size - dataset size we are using train the dataset (.3-.4)
random_state - random splits
"""

X_temp = stock_info[['Volume', 'SMA_50', 'SMA_20', 'EMA_50', 'Vol_MA_50', 'Volatility', 'RSI', 'MACD_Line', 'Bollinger_Upper', 'Bollinger_Lower']].copy()
X = X_temp[X_temp.index > '2005']

y_temp = stock_info['Close'].copy()
y = y_temp[y_temp.index > '2005']


X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.4, random_state=101
)

lm = LinearRegression()

#only fit model with training data
lm.fit(X_train, y_train)

In [None]:
print(lm.intercept_)

2.293270005962846


In [None]:
lm.coef_

array([-1.19166896e-10,  8.77444632e-01,  1.25652749e-02,  7.51628310e-02,
       -2.14872464e-09,  7.15570338e-01,  1.07846954e-02,  3.26301081e+00,
        1.22466989e-02,  1.28838510e-02])

In [None]:
#a 1 unit increase in volume, will lead to a drop / increase in coefficient
cdf = pd.DataFrame(lm.coef_, X.columns, columns=['Coeff'])

In [None]:
#predict just past in features, predict the y values
y_pred = lm.predict(X_test)

In [None]:
"""
Analysis to perform to see how good the fit was:

scatter plot
if scatter plot is pretty close to each other and linear then good fit (straight line)
plt.scatter(y_test, y_pred)

distplot (histogram)
sns.distplot(y_test-y_pred)
if normally distributed then good choice of model (linear regression in this case)

evaluation metrics:
MAE - mean absolute error
MSE - mean squared error
RMSE - root mean squared error (y units)

want to minimize all these errors
"""

print("MAE: ", metrics.mean_absolute_error(y_test, y_pred))
print("MSE: ", metrics.mean_squared_error(y_test, y_pred))
print("RMSE: ", np.sqrt(metrics.mean_squared_error(y_test, y_pred)))

MAE:  0.9130797718517208
MSE:  1.877501503342836
RMSE:  1.3702195091819545


In [None]:
df = stock_info.copy()
df = df[df.index > '2005']

X = df.drop(columns='Close', axis=1)
y = df['Close']

In [None]:
from sklearn.ensemble import RandomForestRegressor

rf = RandomForestRegressor(n_estimators=100)
rf.fit(X,y)

In [None]:
# Number of features you want to keep
num_features_to_keep = 5

# Get the indices of the top features, (slice indices to last 5)
top_indices = np.argsort(rf.feature_importances_)[-num_features_to_keep:]

# Extract the names of the top features
top_features = [X.columns[i] for i in top_indices]

X = X[top_features]

In [None]:
# Splitting the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=101)

# Training the Linear Regression model
lm2 = LinearRegression()
lm2.fit(X_train, y_train)

# Making predictions
y_pred = lm2.predict(X_test)

# Evaluation
print("MAE: ", metrics.mean_absolute_error(y_test, y_pred))
print("MSE: ", metrics.mean_squared_error(y_test, y_pred))
print("RMSE: ", np.sqrt(metrics.mean_squared_error(y_test, y_pred)))


MAE:  0.27203683287246944
MSE:  0.17241789888786677
RMSE:  0.4152323432584061


In [None]:
df = stock_info.copy()
df = df[df.index > '2005']

X = df.drop(columns='Close', axis=1)
y = df['Close']

In [None]:
"""
Apply PCA to reduce features

"""
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

#Scaling is applied, this ensures that feature contributes equally to the analysis
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

#PCA reduces dimensionality of a dataset while retaining as much variance. Finds principal components that varies the most.
#n_components = 5 is asking for 5 new axes where data projected onto these axes show maximum ariance
pca = PCA(n_components=9)
X_pca = pca.fit_transform(X_scaled)
print("Explained variance ratio:", pca.explained_variance_ratio_)

cumulative_variance = pca.explained_variance_ratio_.cumsum()
n_components_95 = (cumulative_variance < .95).sum()+1

print("Total < .95 components: ", n_components_95)

Explained variance ratio: [0.61790003 0.11924491 0.10997877 0.05258369 0.03947525 0.03458071
 0.01662693 0.00562377 0.00213943]
Total < .95 components:  6


In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_pca, y, test_size=0.4, random_state=101)

In [None]:
model = LinearRegression()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)

print("MAE: ", metrics.mean_absolute_error(y_test, y_pred))
print("MSE: ", metrics.mean_squared_error(y_test, y_pred))
print("RMSE: ", np.sqrt(metrics.mean_squared_error(y_test, y_pred)))

MAE:  0.6512982976567215
MSE:  0.8215647590292174
RMSE:  0.906402095666828


**Time Series Forecasting with ARIMA:** Utilize ARIMA model and making data stationary to predict future values

In [None]:
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller

In [None]:
#recategorize data to just business days
#then create monthly and quarterly dataframes to use later on
data_df = stock_info[['Close']].resample('B').ffill()
if data_df.index.freq is None:
    data_df.index.freq = 'B'

monthly_df = data_df.resample('M').last()
quarterly_df = data_df.resample('Q').last()

In [None]:
# test if a dataframe is stationary or not
def test_stationarity(timeseries):
    print('Results of Dickey-Fuller Test:')
    dftest = adfuller(timeseries, autolag='AIC')
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
    for key,value in dftest[4].items():
        dfoutput['Critical Value (%s)' % key] = value
    print(dfoutput)

In [None]:
#test our current dataframe to see if it is stationary or not
test_stationarity(data_df['Close'])

Results of Dickey-Fuller Test:
Test Statistic                     3.269363
p-value                            1.000000
#Lags Used                        40.000000
Number of Observations Used    11204.000000
Critical Value (1%)               -3.430934
Critical Value (5%)               -2.861798
Critical Value (10%)              -2.566907
dtype: float64


In [None]:
#Apply diff to dataframe and check if data is stationary
data_df['Close_diff'] = data_df['Close'].diff()
data_df.dropna(inplace=True)
test_stationarity(data_df['Close_diff'])

Results of Dickey-Fuller Test:
Test Statistic                -1.768880e+01
p-value                        3.580102e-30
#Lags Used                     4.000000e+01
Number of Observations Used    1.120300e+04
Critical Value (1%)           -3.430934e+00
Critical Value (5%)           -2.861798e+00
Critical Value (10%)          -2.566907e+00
dtype: float64


In [None]:
#Apply log to dataframe and check if data is stationary
data_df['Close_log'] = np.log(data_df['Close'])
test_stationarity(data_df['Close_log'])

Results of Dickey-Fuller Test:
Test Statistic                     0.320468
p-value                            0.978264
#Lags Used                        16.000000
Number of Observations Used    11227.000000
Critical Value (1%)               -3.430933
Critical Value (5%)               -2.861797
Critical Value (10%)              -2.566907
dtype: float64


In [None]:
#Apply diff and log to dataframe to check if data is stationary
data_df['Close_diff_log'] = data_df['Close_log'].diff()
data_df.dropna(inplace=True)
test_stationarity(data_df['Close_diff_log'])


Results of Dickey-Fuller Test:
Test Statistic                   -24.803647
p-value                            0.000000
#Lags Used                        15.000000
Number of Observations Used    11227.000000
Critical Value (1%)               -3.430933
Critical Value (5%)               -2.861797
Critical Value (10%)              -2.566907
dtype: float64


In [None]:
#Apply diff and log to dataframe to check if data is stationary
monthly_df['Close_diff'] = monthly_df['Close'].diff()
monthly_df['Close_log'] = np.log(monthly_df['Close'])
monthly_df['Close_diff_log'] = monthly_df['Close_log'].diff()
monthly_df.dropna(inplace=True)
test_stationarity(monthly_df['Close_diff_log'])

Results of Dickey-Fuller Test:
Test Statistic                 -21.272028
p-value                          0.000000
#Lags Used                       0.000000
Number of Observations Used    516.000000
Critical Value (1%)             -3.443087
Critical Value (5%)             -2.867158
Critical Value (10%)            -2.569762
dtype: float64


In [None]:
#Apply diff and log to dataframe to check if data is stationary
quarterly_df['Close_diff'] = quarterly_df['Close'].diff()
quarterly_df['Close_log'] = np.log(quarterly_df['Close'])
quarterly_df['Close_diff_log'] = quarterly_df['Close_log'].diff()
quarterly_df.dropna(inplace=True)
test_stationarity(quarterly_df['Close_diff_log'])

Results of Dickey-Fuller Test:
Test Statistic                -1.312670e+01
p-value                        1.523870e-24
#Lags Used                     0.000000e+00
Number of Observations Used    1.720000e+02
Critical Value (1%)           -3.468952e+00
Critical Value (5%)           -2.878495e+00
Critical Value (10%)          -2.575809e+00
dtype: float64


In [None]:
data_df_final = data_df[['Close_diff_log']].copy()
monthly_df_final = monthly_df[['Close_diff_log']].copy()
quarterly_df_final = quarterly_df[['Close_diff_log']].copy()

In [None]:
from statsmodels.tsa.arima.model import ARIMA
import pmdarima as pm
def auto_arima_model(df, seasonal, m, d, D, max_p, max_q, trace, error_action, suppress_warnings):
    """
    Finds the best ARIMA model parameters using auto_arima.

    Parameters:
    - df: Pandas Series, the time series data.
    - seasonal: bool, whether the model includes seasonal component.
    - m: int, the number of periods in each season.
    - d: int, degree of differencing.
    - D: int, seasonal differencing.
    - max_p: int, maximum p value.
    - max_q: int, maximum q value.
    - trace: bool, whether to print status during the stepwise search.
    - error_action: str, action to take if an error occurs.
    - suppress_warnings: bool, whether to suppress warnings.

    Returns:
    - model: The best ARIMA model found.
    """
    model = pm.auto_arima(df, seasonal=seasonal, m=m, d=d, D=D, max_p=max_p, max_q=max_q,
                       trace=trace, error_action=error_action, suppress_warnings=suppress_warnings)
    return model

In [None]:
def fit_arima_model(df, order):
    """
    Fits an ARIMA model to the time series data.

    Parameters:
    - df: Pandas Series, the time series data.
    - order: tuple, the (p, d, q) order of the model.

    Returns:
    - fitted_model: The fitted ARIMA model.
    """
    arima_model = ARIMA(df, order=order)
    fitted_model = arima_model.fit()
    return fitted_model

In [None]:
def forecast_model(model, steps):
    """
    Generates future forecasts based on the provided ARIMA model.

    Parameters:
    - model (ARIMAResultsWrapper): The fitted ARIMA model from which to generate forecasts.
    - steps (int): The number of future time steps to forecast.

    Returns:
    - forecasts (np.ndarray): An array of forecasted values for the specified number of steps ahead.
    """
    return model.forecast(steps=steps)


In [None]:
def reverse_transform_and_forecast(data, forecasts, frequency):
    """
    Reverses the transformations applied to time series data and generates forecasted values
    in the original scale.

    Parameters:
    - data: The original time series data used for the ARIMA model (log-transformed and differenced).
    - forecasts: The forecasted values from the ARIMA model.
    - frequency: The frequency of the original time series data ('D' for daily, 'M' for monthly, 'Q' for quarterly).

    Returns:
    - forecast_df: DataFrame containing forecast dates and forecasted values in the original scale.
    """
    # Ensure data doesn't contain zero or negative values
    if any(data <= 0):
        raise ValueError("Data contains zero or negative values, which cannot be log-transformed.")

    # Get the last known value of the log-transformed series
    last_log_value = np.log(data).iloc[-1]

    # Reverse the differencing
    forecasts_reversed_diff = last_log_value + forecasts.cumsum()

    # Reverse the log transformation
    forecasts_original_scale = np.exp(forecasts_reversed_diff)

    # Generate forecast dates
    last_date = data.index[-1]
    forecast_dates = pd.date_range(start=last_date, periods=len(forecasts), freq=frequency)

    # Create a DataFrame with forecast dates and values
    forecast_df = pd.DataFrame({'Date': forecast_dates, 'Forecasted_Close': forecasts_original_scale})

    return forecast_df


In [None]:
def reverse_transform_and_forecast(data, forecasts, frequency):
    """
    Reverses the transformations applied to time series data and generates forecasted values
    in the original scale.

    Parameters:
    - data: The original time series data used for the ARIMA model (log-transformed and differenced).
    - forecasts: The forecasted values from the ARIMA model.
    - frequency: The frequency of the original time series data ('D' for daily, 'M' for monthly, 'Q' for quarterly).

    Returns:
    - forecast_df: DataFrame containing forecast dates and forecasted values in the original scale.
    """
    # Ensure data doesn't contain zero or negative values
    # Get the last known value of the log-transformed series
    last_log_value = data['Close_log'][-1]

    # Reverse the differencing
    forecasts_reversed_diff = last_log_value + forecasts.cumsum()

    # Reverse the log transformation
    forecasts_original_scale = np.exp(forecasts_reversed_diff)

    # Generate forecast dates
    last_date = data.index[-1]
    forecast_dates = pd.date_range(start=last_date, periods=len(forecasts), freq=frequency)

    # Create a DataFrame with forecast dates and values
    forecast_df = pd.DataFrame({'Date': forecast_dates, 'Forecasted_Close': forecasts_original_scale})

    return forecast_df

In [None]:
def final_output(df_final, df_original, freq):
  """
  Outputs the final dataframe of forecasted values (daily, monthly quarterly)

  Parameters:
  - df_final: Final dataframe to be inputted (daily, monthly, quarterly)
  - df_original: original data frame prior to data being transformed
  - freq: daily, monthly, quarterly data

  Returns:
  - output_df: DataFrame containing forecast value by date frequency
  """
  model = auto_arima_model(df_final['Close_diff_log'], False, 1, 0, 0, 5, 5, True, 'ignore', True)
  arima_model_fitted = fit_arima_model(df_final, model.order)
  forecast_output = forecast_model(arima_model_fitted, 5)
  output_df = reverse_transform_and_forecast(df_original, forecast_output, freq)
  return output_df

In [None]:
#Remember, the analysis provided here is for educational purposes only and should not be considered financial advice.

In [None]:
#daily
final_output(data_df_final, data_df, 'B')

Performing stepwise search to minimize aic
 ARIMA(2,0,2)(0,0,0)[0]             : AIC=-48537.838, Time=11.10 sec
 ARIMA(0,0,0)(0,0,0)[0]             : AIC=-48528.802, Time=1.66 sec
 ARIMA(1,0,0)(0,0,0)[0]             : AIC=-48528.816, Time=1.81 sec
 ARIMA(0,0,1)(0,0,0)[0]             : AIC=-48528.880, Time=5.53 sec
 ARIMA(1,0,2)(0,0,0)[0]             : AIC=-48528.590, Time=10.42 sec
 ARIMA(2,0,1)(0,0,0)[0]             : AIC=-48527.964, Time=5.22 sec
 ARIMA(3,0,2)(0,0,0)[0]             : AIC=-48541.429, Time=4.48 sec
 ARIMA(3,0,1)(0,0,0)[0]             : AIC=-48538.017, Time=2.87 sec
 ARIMA(4,0,2)(0,0,0)[0]             : AIC=-48540.082, Time=10.47 sec
 ARIMA(3,0,3)(0,0,0)[0]             : AIC=-48539.756, Time=1.70 sec
 ARIMA(2,0,3)(0,0,0)[0]             : AIC=-48541.085, Time=1.61 sec
 ARIMA(4,0,1)(0,0,0)[0]             : AIC=-48542.141, Time=3.45 sec
 ARIMA(4,0,0)(0,0,0)[0]             : AIC=-48544.198, Time=2.72 sec
 ARIMA(3,0,0)(0,0,0)[0]             : AIC=-48539.906, Time=6.26 sec
 A

Unnamed: 0,Date,Forecasted_Close
2024-01-19,2024-01-18,187.273042
2024-01-22,2024-01-19,187.304695
2024-01-23,2024-01-22,187.221224
2024-01-24,2024-01-23,187.419566
2024-01-25,2024-01-24,187.616668


In [None]:
#Remember, the analysis provided here is for educational purposes only and should not be considered financial advice.

In [None]:
#monthly
final_output(monthly_df_final, data_df, 'M')

Performing stepwise search to minimize aic
 ARIMA(2,0,2)(0,0,0)[0]             : AIC=-632.155, Time=0.92 sec
 ARIMA(0,0,0)(0,0,0)[0]             : AIC=-636.276, Time=0.08 sec
 ARIMA(1,0,0)(0,0,0)[0]             : AIC=-637.327, Time=0.10 sec
 ARIMA(0,0,1)(0,0,0)[0]             : AIC=-637.140, Time=0.09 sec
 ARIMA(2,0,0)(0,0,0)[0]             : AIC=-635.863, Time=0.12 sec
 ARIMA(1,0,1)(0,0,0)[0]             : AIC=-635.900, Time=0.22 sec
 ARIMA(2,0,1)(0,0,0)[0]             : AIC=inf, Time=0.60 sec
 ARIMA(1,0,0)(0,0,0)[0] intercept   : AIC=-640.308, Time=0.10 sec
 ARIMA(0,0,0)(0,0,0)[0] intercept   : AIC=-640.052, Time=0.09 sec
 ARIMA(2,0,0)(0,0,0)[0] intercept   : AIC=-638.567, Time=0.37 sec
 ARIMA(1,0,1)(0,0,0)[0] intercept   : AIC=-638.550, Time=0.40 sec
 ARIMA(0,0,1)(0,0,0)[0] intercept   : AIC=-640.209, Time=0.15 sec
 ARIMA(2,0,1)(0,0,0)[0] intercept   : AIC=-636.344, Time=0.36 sec

Best model:  ARIMA(1,0,0)(0,0,0)[0] intercept
Total fit time: 3.621 seconds


Unnamed: 0,Date,Forecasted_Close
2024-02-29,2024-01-31,189.048278
2024-03-31,2024-02-29,191.623962
2024-04-30,2024-03-31,194.268519
2024-05-31,2024-04-30,196.951831
2024-06-30,2024-05-31,199.672356


In [None]:
#Remember, the analysis provided here is for educational purposes only and should not be considered financial advice.

In [None]:
#quarterly
final_output(quarterly_df_final, data_df, 'Q')

Performing stepwise search to minimize aic
 ARIMA(2,0,2)(0,0,0)[0]             : AIC=inf, Time=0.42 sec
 ARIMA(0,0,0)(0,0,0)[0]             : AIC=12.497, Time=0.05 sec
 ARIMA(1,0,0)(0,0,0)[0]             : AIC=14.357, Time=0.03 sec
 ARIMA(0,0,1)(0,0,0)[0]             : AIC=14.352, Time=0.04 sec
 ARIMA(1,0,1)(0,0,0)[0]             : AIC=inf, Time=0.24 sec
 ARIMA(0,0,0)(0,0,0)[0] intercept   : AIC=9.732, Time=0.07 sec
 ARIMA(1,0,0)(0,0,0)[0] intercept   : AIC=11.732, Time=0.05 sec
 ARIMA(0,0,1)(0,0,0)[0] intercept   : AIC=11.732, Time=0.07 sec
 ARIMA(1,0,1)(0,0,0)[0] intercept   : AIC=13.732, Time=0.09 sec

Best model:  ARIMA(0,0,0)(0,0,0)[0] intercept
Total fit time: 1.088 seconds


Unnamed: 0,Date,Forecasted_Close
2024-06-30,2024-03-31,194.847584
2024-09-30,2024-06-30,203.024498
2024-12-31,2024-09-30,211.544561
2025-03-31,2024-12-31,220.422175
2025-06-30,2025-03-31,229.672344
