<a href="https://colab.research.google.com/github/mugurm/colab/blob/main/mugurm_stock_explorer_colab_edition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Stock Explorer
by mugurm

In [37]:
# Import the necessary libraries
import yfinance as yf
import holoviews as hv
import pandas as pd
import numpy as np
from holoviews import opts
import colorcet as cc
from scipy.signal import spectrogram
from scipy.fft import fft
import pywt
from functools import lru_cache
from datetime import datetime, timedelta
from ipywidgets import widgets, Layout
from IPython.display import display, clear_output


# Use bokeh backend
hv.extension('bokeh') 

prices, log_r = None, None

# Define the download_data function and decorate it with lru_cache
@lru_cache(maxsize=None)
def download_data(symbol, start, end):
    try:
        df = yf.download(symbol, start=start, end=end)
        return df
    except Exception as e:
        print(f"Failed to fetch data for {symbol}. Error: {str(e)}")
        return pd.DataFrame()  # Return an empty DataFrame on failure


def calculate_midpoint(row):
    """Calculate the midpoint price between high and low for a passed DataFrame row."""
    return (row['High'] + row['Low']) / 2


def percent_change(prices):
    """Calculate the percent change using numpy"""
    percent_changes = 100 * (np.diff(prices) / prices[:-1])
    return percent_changes


def log_returns(prices):
  """Calculate continuous compounding returns"""
  return np.diff(np.log(prices))


def to_decibels(power):
  """Convert to decibels, aka base 10 log scale."""
  return 10 * np.log10(power)


def hv_scalogram(data, wavelet='cmor1.5-1.0', scales=None, cmap='jet'):
    """Holoviews scalogram"""
    if scales is None:
        scales = np.arange(1, len(data), 0.1)

    # Compute the continuous wavelet transform
    coefficients, frequencies = pywt.cwt(data, scales, wavelet)

    # Define the HoloViews object
    scalogram = hv.Image((np.arange(len(data)), scales, np.abs(coefficients)), 
                          kdims=['Time', 'Scale'], vdims=hv.Dimension('Magnitude', range=(np.abs(coefficients).min(), np.abs(coefficients).max())))
    
    # Plot the scalogram
    scalogram.opts(cmap=cmap, bgcolor=None, colorbar=True, invert_yaxis=True, width=800, height=300, tools=['hover'])

    return scalogram


def calculate_percent_gain(buy_price=None, sell_price=None):
    """
    Calculate the percentage gain (or loss) from buy_price to sell_price
    """
    if buy_price is None or sell_price is None:
        print("Both buy_price and sell_price need to be provided!")
        return None

    return round(((sell_price - buy_price) / buy_price) * 100, 2)


def calculate_trailing_stops(df, trailing_stop_sell, trailing_stop_buy, offset, start_long):
    """Calculate the trailing stop levels based on the highest or lowest price reached so far."""

    # Calculate the initial levels
    if start_long:
        initial_sell_level = calculate_midpoint(df.iloc[offset]) * (1 - trailing_stop_sell / 100)
        df.loc[df.index[offset], 'Trailing_Sell_Level'] = initial_sell_level
    else:  # start_short
        initial_buy_level = calculate_midpoint(df.iloc[offset]) * (1 + trailing_stop_buy / 100)
        df.loc[df.index[offset], 'Trailing_Buy_Level'] = initial_buy_level

    # Update the levels for the following days
    for i in range(offset + 1, len(df)):
        if start_long:
            df.loc[df.index[i], 'Trailing_Sell_Level'] = max(df.loc[df.index[i-1], 'Trailing_Sell_Level'],
                                                             df.iloc[i]['High'] * (1 - trailing_stop_sell / 100))
        else:
            df.loc[df.index[i], 'Trailing_Buy_Level'] = min(df.loc[df.index[i-1], 'Trailing_Buy_Level'],
                                                            df.iloc[i]['Low'] * (1 + trailing_stop_buy / 100))

    # Set the levels to NaN for the days before the offset
    if start_long:
        df.loc[df.index[:offset], 'Trailing_Sell_Level'] = np.nan
    else:
        df.loc[df.index[:offset], 'Trailing_Buy_Level'] = np.nan

    return df


def find_buy_and_sell_signals(df, trailing_buy_pct, trailing_sell_pct, offset, start_long):
    """Find the buy and sell signals."""
    df_offset = df.iloc[offset:]  # only consider data from the offset day
    
    if start_long:
        buy_signal = (df_offset.Low <= df_offset.Trailing_Sell_Level)
    else:
        buy_signal = (df_offset.High >= df_offset.Trailing_Buy_Level)

    buy_date, buy_price, sell_date, sell_price = None, None, None, None

    if buy_signal.any():
        buy_date = buy_signal.idxmax()  # date of first True
        # replaced the low or high price with the trailing level price
        buy_price = df.loc[buy_date, 'Trailing_Sell_Level'] if start_long else df.loc[buy_date, 'Trailing_Buy_Level'] 
        df_after_buy = df.loc[buy_date:].copy()

        if len(df.loc[buy_date:].index) > 1:
            next_day = df.loc[buy_date:].index[1]
            df.loc[next_day:, 'Trailing_Sell_Level' if start_long else 'Trailing_Buy_Level'] = np.nan

        df_after_buy['Trailing_Sell_Level' if start_long else 'Trailing_Buy_Level'] = np.nan

        if start_long:
            df_after_buy['Trailing_Buy_Level'] = df_after_buy['Low'].expanding().min() * (1 + trailing_buy_pct / 100)
            sell_signal = (df_after_buy.High >= df_after_buy.Trailing_Buy_Level)
        else:
            df_after_buy['Trailing_Sell_Level'] = df_after_buy['High'].expanding().max() * (1 - trailing_sell_pct / 100)
            sell_signal = (df_after_buy.Low <= df_after_buy.Trailing_Sell_Level)
        
        if sell_signal.any():
            sell_date = sell_signal.idxmax()  # date of first True
            # replaced the low or high price with the trailing level price
            sell_price = df_after_buy.loc[sell_date, 'Trailing_Buy_Level'] if start_long else df_after_buy.loc[sell_date, 'Trailing_Sell_Level'] 

            if len(df_after_buy.loc[sell_date:].index) > 1:
                next_day = df_after_buy.loc[sell_date:].index[1]
                df_after_buy.loc[next_day:, 'Trailing_Buy_Level' if start_long else 'Trailing_Sell_Level'] = np.nan

        df = df_after_buy.combine_first(df)  # give priority to df_after_buy and merge the new trailing levels into the original dataframe
    return buy_date, buy_price, sell_date, sell_price, df


def create_plot(df, buy_date, buy_price, sell_date, sell_price, symbol, buy_pct, sell_pct, offset, start_long):
    """Create and display the plot."""
    price_curve = hv.Curve(df, ('Date', 'Date'), ('High', 'High')).opts(line_width=0.75, color='lightgray') * \
                  hv.Curve(df, ('Date', 'Date'), ('Low', 'Low')).opts(line_width=0.75, color='lightgray')
    sell_curve = hv.Curve(df, ('Date', 'Date'), ('Trailing_Sell_Level', 'Trailing Sell Level')).opts(color='red')

    # Only plot Trailing_Buy_Level if it exists
    if 'Trailing_Buy_Level' in df.columns:
        buy_curve = hv.Curve(df, ('Date', 'Date'), ('Trailing_Buy_Level', 'Trailing Buy Level')).opts(color='green')
        # Create and add the buy dotted line
        buy_line = hv.Curve([(buy_date, buy_price), (buy_date, df.loc[buy_date, ('Trailing_Buy_Level' if start_long else 'Trailing_Sell_Level')])])\
          .opts(color='green' if start_long else 'red', line_dash='dotted')
    else:
        buy_curve = hv.Curve([])  # Empty placeholder to not disrupt the plot
        buy_line = hv.Curve([])  # Empty placeholder to not disrupt the plot

    # Create and add the sell dotted line
    offset_date = df.index[offset]
    level = ('Trailing_Sell_Level' if start_long else 'Trailing_Buy_Level')
    sell_line = hv.Curve([(offset_date, calculate_midpoint(df.loc[offset_date])), (offset_date, df.loc[offset_date, level])])\
      .opts(color='red' if start_long else 'green', line_dash='dotted')

    buy_point = hv.Scatter([(buy_date, buy_price)] if buy_date else []).opts(color='red' if start_long else 'green', size=10)
    sell_point = hv.Scatter([(sell_date, sell_price)] if sell_date else []).opts(color='green' if start_long else 'red', size=10)

    # Add yellow dot for the first day of the first trailing stop calculation
    first_trailing_stop_point = hv.Scatter([(offset_date, calculate_midpoint(df.loc[offset_date]))]).opts(color='orange', marker='square', size=10)

    # Construct the title string
    try: 
      gain = round(calculate_percent_gain(buy_price, sell_price), 2)
    except:
      gain = "Not sold"
    title_str = f"{symbol} - TS Buy: {buy_pct}%, TS Sell: {sell_pct}%. ∆: {gain}%."

    display((price_curve * sell_curve * buy_curve * buy_line * sell_line * buy_point * sell_point * first_trailing_stop_point).opts(width=800, height=350, title=title_str))


def plot_stock(symbol, days, trailing_stop_pct_buy, trailing_stop_pct_sell, days_offset, start_long, compute_spectrogram):
    """Fetch and plot the stock data."""
    end = datetime.now()
    start = end - timedelta(days=days)

    # Fetch data
    df = download_data(symbol, start, end)
    if df.empty:
        print(f"No data available for {symbol} in the given date range.")
        return

    # Calculate trailing stops
    df = calculate_trailing_stops(df, trailing_stop_pct_sell, trailing_stop_pct_buy, days_offset, start_long)

    # Find buy and sell signals
    buy_date, buy_price, sell_date, sell_price, df = find_buy_and_sell_signals(df, trailing_stop_pct_buy, trailing_stop_pct_sell, days_offset, start_long)
    
    # Create and display the stock and trail stop levels
    create_plot(df, buy_date, buy_price, sell_date, sell_price, symbol, trailing_stop_pct_buy, trailing_stop_pct_sell, days_offset, start_long)
    
    # Create and display the scalogram of price and percent change.
    if compute_spectrogram:
        prices = df['Adj Close'].values
        log_r = log_returns(prices)
        r2 = percent_change(log_r)
        s_scales = np.arange(1, len(prices) / 2, .05)
        display(hv_scalogram(prices).opts(width=900, title=f"{symbol} - Price Scalogram {start} - {end}"))
        display(hv_scalogram(log_r, scales = s_scales).opts(width=900, title=f"{symbol} - Log returns Scalogram {start} - {end}"))
        display(hv.Curve(log_r, 'Date', 'Adj Close').opts(width=800, height=150, title=f"{symbol} - Log returns {start} - {end}"))
        display(hv.Curve(r2, 'Date', 'Adj Close').opts(width=800, height=150, title=f"{symbol} - 2nd Order returns {start} - {end}"))
        infodf = pd.DataFrame({'price': prices[1:], 'log_returns': log_r})
        display(infodf.describe())


# Create input widgets
iwidth = '700px'
symbol_input = widgets.Text(value='GOOG', description='Stock Symbol:')
days_input = widgets.IntSlider(layout=Layout(width=iwidth), value=180, min=20, max=360, step=10, description='Days:')
days_offset = widgets.IntSlider(layout=Layout(width=iwidth), value=10, min=0, max=119, step=1, description='Start day:')
trailing_stop_pct_buy_input = widgets.FloatSlider(layout=Layout(width=iwidth), value=10.0, min=0.1, max=25.0, step=0.1, description='Buy Trailing Stop (%):')
trailing_stop_pct_sell_input = widgets.FloatSlider(layout=Layout(width=iwidth), value=15.0, min=0.1, max=25.0, step=0.1, description='Sell Trailing Stop (%):')
start_state_input = widgets.Checkbox(value=True, description='Start Long (off for Short):')
compute_spectrogram_input = widgets.Checkbox(value=True, description='Spectrograms (slow):')


# Function to update the maximum value of days_offset slider dynamically
def update_days_offset_range(*args):
    # Using magic number to approximate trading days to calendar days.
    days_offset.max = int(round(days_input.value * 0.67))
    days_offset.value = 3

# Observe the value of the Days slider
days_input.observe(update_days_offset_range, 'value')

# Define the event handler
def on_input_change(change):
    clear_output(wait=True)
    display(symbol_input, days_input, trailing_stop_pct_buy_input, trailing_stop_pct_sell_input, days_offset, start_state_input, compute_spectrogram_input)
    plot_stock(symbol_input.value, days_input.value, trailing_stop_pct_buy_input.value, trailing_stop_pct_sell_input.value, days_offset.value, start_state_input.value, compute_spectrogram_input.value)

# Attach the event handler to the 'on_submit' event and 'value' change event
symbol_input.on_submit(on_input_change)
days_input.observe(on_input_change, names='value')
days_offset.observe(on_input_change, names='value')
trailing_stop_pct_buy_input.observe(on_input_change, names='value')
trailing_stop_pct_sell_input.observe(on_input_change, names='value')
start_state_input.observe(on_input_change, names='value')
compute_spectrogram_input.observe(on_input_change, names='value')

# Display the widgets and initial plot
display("Yahoo finance format for stock, index and fx symbols i.e MSFT, GOOG, BTC-USD, ARSUSD=X, ^IXIC")
display(symbol_input, days_input, trailing_stop_pct_buy_input, trailing_stop_pct_sell_input, days_offset, start_state_input, compute_spectrogram_input)
plot_stock(symbol_input.value, days_input.value, trailing_stop_pct_buy_input.value, trailing_stop_pct_sell_input.value, days_offset.value, start_state_input.value, compute_spectrogram_input.value)



Text(value='GOOG', description='Stock Symbol:')

IntSlider(value=180, description='Days:', layout=Layout(width='700px'), max=360, min=20, step=10)

FloatSlider(value=16.1, description='Buy Trailing Stop (%):', layout=Layout(width='700px'), max=25.0, min=0.1)

FloatSlider(value=4.4, description='Sell Trailing Stop (%):', layout=Layout(width='700px'), max=25.0, min=0.1)

IntSlider(value=10, description='Start day:', layout=Layout(width='700px'), max=119)

Checkbox(value=True, description='Start Long (off for Short):')

Checkbox(value=True, description='Spectrograms (slow):')

[*********************100%***********************]  1 of 1 completed


Unnamed: 0,price,log_returns
count,122.0,122.0
mean,99.972828,0.001915
std,8.899721,0.022262
min,86.459999,-0.077331
25%,92.397499,-0.012644
50%,99.575001,-0.001168
75%,105.885,0.016315
max,125.870003,0.070142
