In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
import math
from datetime import datetime, timedelta, date
import time
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.ticker as plticker
import matplotlib.patches as patches
from matplotlib.colors import TwoSlopeNorm
import seaborn as sns
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from mapping_tickers import *
from utils import *
from mapping_plot_attributes import *
from mapping_portfolio_downloads import *
from analyze_prices import AnalyzePrices
from download_data import DownloadData

In [19]:
tk = 'AAPL'
tk_market = '^GSPC'
end_date = datetime.today()
start_date = datetime(end_date.year - 1, end_date.month, end_date.day)

hist_data = DownloadData(end_date, start_date, [tk], tk_market)
analyze_prices = AnalyzePrices(end_date, start_date, [tk])

ticker_data = hist_data.download_yh_data(start_date, end_date, [tk], tk_market)

df_adj_close = pd.DataFrame()
df_close = pd.DataFrame()
df_ohlc = pd.DataFrame()

df_adj_close = ticker_data['Adj Close']
df_close = ticker_data['Close']
df_ohlc = ticker_data['OHLC'][tk] 

ohlc_tk = df_ohlc.copy()
adj_close_tk = df_adj_close[tk]
close_tk = df_close[tk]
open_tk = ohlc_tk['Open']
high_tk = ohlc_tk['High']
low_tk = ohlc_tk['Low']

price_type_map = {
    'Adj Close': adj_close_tk,
    'Adjusted Close': adj_close_tk,
    'Close': close_tk,
    'Open': open_tk,
    'High': high_tk,
    'Low': low_tk
}

# theme = 'light'
theme = 'dark'
style= theme_style[theme]

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed


In [16]:
def macd(
    df_tk,
    signal_window = 9      
):
    """
    df_tk: a series of price values, taken as a column of df_close or df_adj_close for ticker tk
    
    """ 

    if not isinstance(df_tk, pd.Series):
        print('Incorrect format of input data')
        exit
    
    ema_26 = df_tk.ewm(span = 26).mean()
    ema_12 = df_tk.ewm(span = 12).mean()
    macd_line = ema_12 - ema_26
        
    macd_signal = macd_line.ewm(span = signal_window).mean()

    macd_data = {
        'MACD': macd_line,
        'MACD Signal': macd_signal,
        'MACD Signal Window': signal_window
    }

    return macd_data

In [17]:
def plot_macd_plotly(
    tk_macd,
    macd_data,
    df_tk,
    n_ticks_max = 48,
    n_yticks_max = 16,
    plot_width = 1450,
    plot_height = 750,
    title_font_size = 32,
    theme = 'dark',
    overlay_price = True,
    price_type = 'close'
):
    """
    MACD plot with a signal line and the original price line overlayed, if desired
    price_type: Normally 'adjusted close' or 'close' or whatever MACD is based on
    """
    
    # macd['Date'] = macd.index.astype(str)
        
    macd = macd_data['MACD']
    macd_signal = macd_data['MACD Signal']
    macd_signal_window = macd_data['MACD Signal Window']
 
    x_min = str(macd.index.min().date())
    x_max = str(macd.index.max().date())
 
    min_macd = min(macd)
    max_macd = max(macd)
    y_macd_min, y_macd_max = set_axis_limits(min_macd, max_macd)
    
    macd_positive = macd.copy()
    macd_positive.iloc[np.where(macd_positive < 0)] = np.nan
    macd_negative = macd.copy()
    macd_negative.iloc[np.where(macd_negative >= 0)] = np.nan
    
    title_macd = f'{tk_macd} Moving Average Convergence Divergence (EMA 12-26)'

    price_types = ['adjusted close', 'adj close', 'close', 'open', 'high', 'low']
    if price_type in price_types:
        price_name = 'Adjusted Close' if price_type == 'adj close' else price_type.title()
    else:
        price_name = 'Adjusted Close'
    
    style = theme_style[theme]
    
    if overlay_price:
        fig_macd = make_subplots(specs=[[{'secondary_y': True}]])
    else:
        fig_macd = make_subplots(rows = 1, cols = 1)
    
    fig_macd.add_trace(
        go.Bar(
            # x = macd_positive['Date'],
            x = macd_positive.index.astype(str),
            y = macd_positive,
            marker_color = style['green_color'],
            width = 1,
            name = 'MACD > 0'
        ),
        secondary_y = False
    )
    fig_macd.add_trace(
        go.Bar(
            # x = macd_negative['Date'],
            x = macd_negative.index.astype(str),
            y = macd_negative,
            marker_color = style['red_color'],
            width = 1,
            name = 'MACD < 0'
        ),
        secondary_y = False
    )
    fig_macd.add_trace(
        go.Scatter(
            # x = macd_signal['Date'],
            x = macd_signal.index.astype(str),
            y = macd_signal,
            line = dict(color = style['signal_color']),
            # name = 'Signal Line'  # 9-day span is a standard, no need to customise it
            name = f'MACD EMA {macd_signal_window} Signal'
        ),
        secondary_y = False
    )
    if overlay_price:
        fig_macd.add_trace(
            go.Scatter(
                x = macd.index.astype(str),
                y = df_tk,
                line = dict(color = style['basecolor']),
                name = price_name
            ),
            secondary_y = True
        )
    
    # Add plot border
    fig_macd.add_shape(
        type = 'rect',
        xref = 'x',  # use 'x' because of seconday axis - 'paper' does not work correctly
        yref = 'paper',
        x0 = x_min,
        x1 = x_max,
        y0 = 0,
        y1 = 1,
        line_color = style['x_linecolor'],
        line_width = 0.3
    )
    # Update layout and axes
    fig_macd.update_layout(
        width = plot_width,
        height = plot_height,
        xaxis_rangeslider_visible = False,
        template = style['template'],
        title = dict(
            text = title_macd,
            font_size = title_font_size,
            y = 0.95,
            x = 0.45,
            xanchor = 'center',
            yanchor = 'top'
        )
    )
    fig_macd.update_yaxes(
        title_text = f'MACD',
        range = (y_macd_min, y_macd_max),
        secondary_y = False,
        gridcolor = style['y_gridcolor'],
        nticks = n_yticks_max,
        ticks = 'outside',
        ticklen = 8,
        ticklabelshift = 5,  # not working
        ticklabelstandoff = 10  # not working
    )
    if overlay_price:
        fig_macd.update_yaxes(
            title_text = price_name,
            secondary_y = True,
            ticks = 'outside',
            ticklen = 8,
            ticklabelshift = 5,  # not working
            ticklabelstandoff = 10,  # not working
            showgrid = False
        )
    fig_macd.update_xaxes(
        type = 'category',
        nticks = n_ticks_max,
        tickangle = -90,
        gridcolor = style['x_gridcolor'],
        ticks = 'outside',
        ticklen = 8,
        ticklabelshift = 5,  # not working
        ticklabelstandoff = 10,  # not working
        showgrid = True  # Thanks to Copilot for showing me this option! Googling didn't get me anywhere...
    )

    return fig_macd

In [20]:
macd_data = macd(adj_close_tk)

fig_macd = plot_macd_plotly(tk, macd_data, adj_close_tk, theme = theme)
fig_macd.show()

In [6]:
def weighted_mean(values):
    """
    values: a list, tuple or series of numerical values
    """
    if isinstance(values, (list, tuple)):
        values = pd.Series(values)
    
    n = len(values)
    weight_sum = n * (n + 1) / 2
    weights = range(n + 1)[1:]
    wm = values @ weights / weight_sum
    return wm

In [7]:
def moving_average(
    df_tk,
    ma_type,
    ma_window,
    min_periods = 1
):
    """
    df_tk:      
        a series of price values, taken as a column of df_close or df_adj_close for ticker tk
    ma_type:    
        simple ('sma'),
        exponential ('ema'),
        double exponential ('dema'),
        triple exponential ('tema'),
        weighted ('wma')
    window:
        length in days
    Returns ma
    """

    if not isinstance(df_tk, pd.Series):
        print('Incorrect format of input data')
        exit
    
    if ma_type in ['ema', 'dema', 'tema']:
        ma = df_tk.ewm(span = ma_window).mean()
        if ma_type in ['dema', 'tema']:
            ma = ma.ewm(span = ma_window).mean()
            if ma_type == 'tema':
                ma = ma.ewm(span = ma_window).mean()
    
    elif ma_type == 'wma':
        ma = df_tk.rolling(window = ma_window, min_periods = min_periods).apply(lambda x: weighted_mean(x))

    else:  # 'sma' or anything else
        ma = df_tk.rolling(window = ma_window, min_periods = min_periods).mean()
    
    return ma

In [21]:
def plot_diff_plotly(
    tk,
    diff_data,
    price_type_map,
    reverse_diff = False,
    add_signal = True,
    n_ticks_max = 48,
    n_yticks_max = 16,
    plot_width = 1450,
    plot_height = 750,
    title_font_size = 32,
    theme = 'dark'
):
    """
    price_type_map = {
        'Adj Close': adj_close_tk,
        'Adjusted Close': adj_close_tk,
        'Close': close_tk,
        'Open': open_tk,
        'High': high_tk,
        'Low': low_tk
    }
    reverse_diff:
        if True, the (p2 - p1) difference will be used instead of (p1 - p2)
    add_signal:
        if True, a signal will be added that is a moving average of the calculated difference
    """
    
    base = diff_data['p_base']
    p_base_name = base.title()
    p_base = price_type_map[p_base_name]

    p1_type = diff_data['p1_type']
    p2_type = diff_data['p2_type']
    p1_window = diff_data['p1_window']
    p2_window = diff_data['p2_window']
    signal_type = diff_data['signal_type']
    signal_window = diff_data['signal_window']
    
    price_types = ['adjusted close', 'adj close', 'close', 'open', 'high', 'low']
    ma_types = ['sma', 'ema', 'dema', 'tema', 'wma']

    if p1_type in price_types:
        p1_name = 'Adjusted Close' if p1_name == 'adj close' else p1_type.title()
        try:
            p1 = price_type_map[p1_name]
        except:
            p1 = price_type_map['Adj Close']

    elif p1_type in ma_types:
        p1 = moving_average(p_base, p1_type, p1_window)
        p1_name = f'{p1_type.upper()} {p1_window}'

    if p2_type in price_types:
        p2_name = 'Adjusted Close' if p2_name == 'adj close' else p2_type.title()
        try:
            p2 = price_type_map[p2_name]
        except:
            p2 = price_type_map['Adj Close']

    elif p2_type in ma_types:
        p2 = moving_average(p_base, p2_type, p2_window)
        p2_name = f'{p2_type.upper()} {p2_window}'

    if not reverse_diff:
        diff = p1 - p2
        diff_title = f'{tk} {p_base_name} {p1_name} - {p2_name} Oscillator'
        diff_positive_name = f'{p1_name} > {p2_name}'
        diff_negative_name = f'{p1_name} < {p2_name}'
    else:
        diff = p2 - p1
        diff_title = f'{tk} {p_base_name} {p2_name} - {p1_name} Oscillator'
        diff_positive_name = f'{p2_name} > {p1_name}'
        diff_negative_name = f'{p2_name} < {p1_name}'

    diff_signal = moving_average(diff, signal_type, signal_window)
    signal_name = f'Diff {signal_type.upper()} {signal_window} Signal'

    x_min = str(diff.index.min().date())
    x_max = str(diff.index.max().date())
 
    min_diff = min(diff)
    max_diff = max(diff)
    
    y_diff_min, y_diff_max = set_axis_limits(min_diff, max_diff)
    
    diff_positive = diff.copy()
    diff_negative = diff.copy()

    prev_v = diff.iloc[0]
    diff_positive.iloc[0] = prev_v if prev_v >= 0 else np.nan
    diff_negative.iloc[0] = prev_v if prev_v < 0 else np.nan

    for idx in diff.index[1:]:
        
        curr_v = diff.loc[idx]

        if np.sign(curr_v) != np.sign(prev_v):
            # Set both diff copies to 0 if the value is changing sign
            diff_positive[idx] = 0
            diff_negative[idx] = 0
        else:
            # Set both diff copies to current value or NaN
            diff_positive[idx] = curr_v if curr_v >= 0 else np.nan
            diff_negative[idx] = curr_v if curr_v < 0 else np.nan
        
        prev_v = curr_v

    style = theme_style[theme]
    
    fig_diff = make_subplots(rows = 1, cols = 1)

    fig_diff.add_trace(
        go.Scatter(
            x = diff_positive.index.astype(str),
            y = diff_positive,
            line_color = style['diff_green_linecolor'],
            line_width = 2,
            fill = 'tozeroy',
            fillcolor = style['diff_green_fillcolor'],
            name = diff_positive_name
        )
    )
    fig_diff.add_trace(
        go.Scatter(
            x = diff_negative.index.astype(str),
            y = diff_negative,
            line_color = 'darkred',
            line_width = 2,
            fill = 'tozeroy',
            fillcolor = style['diff_red_fillcolor'],
            name = diff_negative_name
        )
    )
    if add_signal:
        fig_diff.add_trace(
            go.Scatter(
                x = diff_signal.index.astype(str),
                y = diff_signal,
                line_color = style['signal_color'],
                line_width = 2,
                name = signal_name
            )
        )
    
    # Add plot border
    fig_diff.add_shape(
        type = 'rect',
        xref = 'x',  # use 'x' because of seconday axis - 'paper' does not work correctly
        yref = 'paper',
        x0 = x_min,
        x1 = x_max,
        y0 = 0,
        y1 = 1,
        line_color = style['x_linecolor'],
        line_width = 0.3
    )
    # Update layout and axes
    fig_diff.update_layout(
        width = plot_width,
        height = plot_height,
        xaxis_rangeslider_visible = False,
        template = style['template'],
        title = dict(
            text = diff_title,
            font_size = title_font_size,
            y = 0.95,
            x = 0.45,
            xanchor = 'center',
            yanchor = 'top'
        )
    )
    fig_diff.update_yaxes(
        title_text = f'Oscillator',
        range = (y_diff_min, y_diff_max),
        secondary_y = False,
        nticks = n_yticks_max,
        gridcolor = style['y_gridcolor'],
        ticks = 'outside',
        ticklen = 8,
        ticklabelshift = 5,  # not working
        ticklabelstandoff = 10,  # not working
    )
    fig_diff.update_xaxes(
        type = 'category',
        nticks = n_ticks_max,
        tickangle = -90,
        gridcolor = style['x_gridcolor'],
        ticks = 'outside',
        ticklen = 8,
        ticklabelshift = 5,  # not working
        ticklabelstandoff = 10,  # not working
        showgrid = True
    )

    return fig_diff

In [11]:
# These must be specified by the user, except signal_type and signal_window if the user chooses not to add signal
# The price types should be capitalized as they appear in the menu, must also be consistent with price_list
# The MA acronyms can stay lower case, easy to convert 

diff_data = {
    'p_base': 'close',  # 'adjusted close', 'adj close', 'close', 'open', 'high', 'low'
    'p1_type': 'ema',  # 'adjusted close', 'adj close', 'close', 'open', 'high', 'low', 'sma', 'ema', 'dema', 'tema', 'wma'
    'p2_type': 'wma',  # 'adjusted close', 'adj close', 'close', 'open', 'high', 'low', 'sma', 'ema', 'dema', 'tema', 'wma'
    'p1_window': 10,
    'p2_window': 10,
    'signal_type': 'ema',  # 'sma', 'ema', 'dema', 'tema', 'wma'
    'signal_window': 5
}

In [22]:
theme = 'light'
theme = 'light'
fig_diff = plot_diff_plotly(tk, diff_data, price_type_map, add_signal = True, theme = theme)
fig_diff.show()