### Candlestick & TA 

Using yf dataframe we can construct candles with the Open, Close, High, Low price.

Goal: Mathematically define each candle.

In [23]:
import numpy as np
import seaborn as sns
from datetime import datetime
import yfinance as yf
import talib
import matplotlib.pyplot as plt
import pandas as pd
import mplfinance as mpf
import matplotlib.gridspec as gridspec
import pingouin as pg

- Manual Candlestick Information: https://thepatternsite.com/CandleEntry.html ... Thomas Bulkowski defines 103 candlestick patterns
- Candlestick Library Information: https://medium.com/@crisvelasquez/automating-61-candlestick-trading-patterns-in-python-f28709c50684 
    - ta-lib Documentation: https://ta-lib.github.io/ta-lib-python/funcs.html 

In [24]:
class CandlestickPattern():
    def __init__(self, id, name, pattern_recog_func, description):
        self.id = id
        self.name = name
        self.pattern_func = pattern_recog_func
        self.description = description

In [25]:
"""
Candle Types:

bearish - bearish reversal pattern 
    100 if true
bullish - bullish reversal pattern 
    100 if true
cond - potential reversal or continuation (breakout - figure out how to deal with conditional patterns)
    +200 bullish pattern with confirmation
    +100 bullish pattern (most cases)
    0 none
    -100 bearish pattern
    -200 bearish pattern with confirmation
reversal - bull to bear or bear to bull
    100 if bull to bear, -100 if bear to bull
continuation - continuation of trend
    100 bull, -100 if bear
indecision - market doesn't know what to do
    100 if true
"""

pattern_funcs = [
    ("Two Crows", talib.CDL2CROWS, 'bearish'),
    ("Three Black Crows", talib.CDL3BLACKCROWS, 'bearish'),
    ("Three Inside Up/Down", talib.CDL3INSIDE, 'reversal'),
    ("Three-Line Strike", talib.CDL3LINESTRIKE, 'cond'),
    ("Three Outside Up/Down", talib.CDL3OUTSIDE, 'reversal'),
    ("Three Stars In The South", talib.CDL3STARSINSOUTH, 'bullish'),
    ("Three Advancing White Soldiers", talib.CDL3WHITESOLDIERS, 'bullish'),
    ("Abandoned Baby", talib.CDLABANDONEDBABY, 'reversal'),
    ("Advance Block", talib.CDLADVANCEBLOCK, 'bearish'),
    ("Belt-hold", talib.CDLBELTHOLD, 'reversal'),
    ("Breakaway", talib.CDLBREAKAWAY, 'reversal'),
    ("Closing Marubozu", talib.CDLCLOSINGMARUBOZU, 'continuation'),
    ("Concealing Baby Swallow", talib.CDLCONCEALBABYSWALL, 'bullish'),
    ("Counterattack", talib.CDLCOUNTERATTACK, 'reversal'),
    ("Dark Cloud Cover", talib.CDLDARKCLOUDCOVER, 'bearish'),
    ("Doji", talib.CDLDOJI, 'indecision'),
    ("Doji Star", talib.CDLDOJISTAR, 'reversal'),
    ("Dragonfly Doji", talib.CDLDRAGONFLYDOJI, 'bullish'),
    ("Engulfing Pattern", talib.CDLENGULFING, 'reversal'),
    ("Evening Doji Star", talib.CDLEVENINGDOJISTAR, 'bearish'),
    ("Evening Star", talib.CDLEVENINGSTAR, 'bearish'),
    ("Up/Down-gap side-by-side white lines", talib.CDLGAPSIDESIDEWHITE, 'continuation'),
    ("Gravestone Doji", talib.CDLGRAVESTONEDOJI, 'bearish'),
    ("Hammer", talib.CDLHAMMER, 'bullish'),
    ("Hanging Man", talib.CDLHANGINGMAN, 'bearish'),
    ("Harami Pattern", talib.CDLHARAMI, 'reversal'),
    ("Harami Cross Pattern", talib.CDLHARAMICROSS, 'indecision'),
    ("High-Wave Candle", talib.CDLHIGHWAVE, 'indecision'),
    ("Hikkake Pattern", talib.CDLHIKKAKE, 'cond'),
    ("Modified Hikkake Pattern", talib.CDLHIKKAKEMOD, 'cond'),
    ("Homing Pigeon", talib.CDLHOMINGPIGEON, 'bullish'),
    ("Identical Three Crows", talib.CDLIDENTICAL3CROWS, 'bearish'),
    ("In-Neck Pattern", talib.CDLINNECK, 'continuation'),
    ("Inverted Hammer", talib.CDLINVERTEDHAMMER, 'bullish'),
    ("Kicking", talib.CDLKICKING, 'reversal'),
    ("Kicking - bull/bear determined by the longer marubozu", talib.CDLKICKINGBYLENGTH, 'reversal'),
    ("Ladder Bottom", talib.CDLLADDERBOTTOM, 'bullish'),
    ("Long Legged Doji", talib.CDLLONGLEGGEDDOJI, 'indecision'),
    ("Long Line Candle", talib.CDLLONGLINE, 'cond'),
    ("Marubozu", talib.CDLMARUBOZU, 'cond'),
    ("Matching Low", talib.CDLMATCHINGLOW, 'bullish'),
    ("Mat Hold", talib.CDLMATHOLD, 'continuation'),
    ("Morning Doji Star", talib.CDLMORNINGDOJISTAR, 'bullish'),
    ("Morning Star", talib.CDLMORNINGSTAR, 'bullish'),
    ("On-Neck Pattern", talib.CDLONNECK, 'continuation'),
    ("Piercing Pattern", talib.CDLPIERCING, 'bullish'),
    ("Rickshaw Man", talib.CDLRICKSHAWMAN, 'indecision'),
    ("Rising/Falling Three Methods", talib.CDLRISEFALL3METHODS, 'continuation'),
    ("Separating Lines", talib.CDLSEPARATINGLINES, 'continuation'),
    ("Shooting Star", talib.CDLSHOOTINGSTAR, 'bearish'),
    ("Short Line Candle", talib.CDLSHORTLINE, 'indecision'),
    ("Spinning Top", talib.CDLSPINNINGTOP, 'indecision'),
    ("Stalled Pattern", talib.CDLSTALLEDPATTERN, 'bearish'),
    ("Stick Sandwich", talib.CDLSTICKSANDWICH, 'bullish'),
    ("Takuri (Dragonfly Doji with very long lower shadow)", talib.CDLTAKURI, 'bullish'),
    ("Tasuki Gap", talib.CDLTASUKIGAP, 'continuation'),
    ("Thrusting Pattern", talib.CDLTHRUSTING, 'continuation'),
    ("Tristar Pattern", talib.CDLTRISTAR, 'reversal'),
    ("Unique 3 River", talib.CDLUNIQUE3RIVER, 'bullish'),
    ("Upside Gap Two Crows", talib.CDLUPSIDEGAP2CROWS, 'bearish'),
    ("Upside/Downside Gap Three Methods", talib.CDLXSIDEGAP3METHODS, 'continuation')
]

""" Consider object oriented - store image + description """
# for pattern in pattern_funcs:
#     CandlestickPattern()

# Retrieve data from yf
# data = yf.download("AAPL", start="2022-05-01", end="2022-05-03",  interval = "1m")
symbol = "SPY"
period = "5y"
data = yf.download(symbol, period = period)

df_columns = ['Asset','Time Period','Length of Return','Candle Pattern','Value', 'Avg % Return Following Pattern', 
             'Avg % Return In General', 'Avg Volume Following Pattern', 'Avg Volume In General', 'p-value Returns', 
             'p-value Volume','Occurrences']

[*********************100%%**********************]  1 of 1 completed


### Pattern Recognition
Each tuple in pattern_funcs contains the pattern's name and the TA-Lib function that identifies it. These functions will be applied to the stock data to find occurrences of each pattern.
- For each pattern, the corresponding TA-Lib function is called with the Open, High, Low, and Close prices of the stock.
- The result is a series where non-zero values indicate the presence of the pattern. These dates are stored in pattern_dates.

### Visualization
The visualization segment of the master script then uses Matplotlib and mplfinance to graphically represent the stock data and the detected patterns.
- A figure and a grid layout are set up for plotting.
- Market colors are defined (green for up, red for down) and a custom style is created for mplfinance.

The script then plots the closing prices of the stock and highlights dates where patterns were found. The main subplot (ax1) shows the stock's closing prices.
Vertical lines and annotations are added on dates where patterns were identified, marked in red.

In [None]:
for pattern_name, pattern_func, pattern_type in pattern_funcs:
    data[pattern_name] = pattern_func(data['Open'], data['High'], data['Low'], data['Close'])
    pattern_dates = data[data[pattern_name] != 0].index

#     # Skip if there are no detected patterns of this type
#     if len(pattern_dates) == 0:
#         continue

#     fig = plt.figure(figsize=(20, 10))
#     gs = gridspec.GridSpec(2, 4)

#     mc = mpf.make_marketcolors(up='g', down='r', inherit=True)
#     custom_style = mpf.make_mpf_style(marketcolors=mc)

#     ax1 = plt.subplot(gs[0, :3])
#     data[['Close']].plot(ax=ax1, color='blue')
#     for date in pattern_dates:
#         ax1.axvline(date, color='red', linestyle='--', label=pattern_name if pattern_name not in [l.get_label() for l in ax1.lines] else "")
#         ax1.annotate(date.strftime('%Y-%m-%d'), (date, data['Close'].loc[date]), xytext=(-15,10+20), 
#                      textcoords='offset points', color='red', fontsize=12, rotation=90)

#     window = 5  # Days before and after the pattern. Currentle will ignore weekends b/c intersection occurs on data.index (only dates available in price action dataframe).
#                 # So this window isn't always going to be equal, it depends on where the data intersects with the index of data 
#                 # Just assume candle of interest is in middle

#     for i in range(5):
#         if len(pattern_dates) > i:
#             pattern_date = pattern_dates[-(i+1)]

#             start_date = pattern_date - pd.Timedelta(days=window)
#             end_date = min(data.index[-1], pattern_date + pd.Timedelta(days=window))
#             valid_dates = pd.date_range(start=start_date, end=end_date).intersection(data.index)

#             subset = data.loc[valid_dates]

#             if i == 0:
#                 ax = plt.subplot(gs[0, 3])
#             else:
#                 ax = plt.subplot(gs[1, i-1])

#             mpf.plot(subset, type='candle', ax=ax, volume=False, show_nontrading=False, style=custom_style)
#             ax.set_title(f'{pattern_name} Pattern {i+1} for {symbol}')

#             x_ticks = list(range(0, len(valid_dates), 1))
#             x_labels = [date.strftime('%Y-%m-%d') for date in valid_dates]
#             ax.set_xticks(x_ticks)
#             ax.set_xticklabels(x_labels, rotation=90)

#     ax1.set_title(f"{symbol} Stock Price and {pattern_name} Pattern Detection")
#     ax1.legend(loc='best')
#     ax1.grid(True)
#     ax1.set_xlabel("Date")
#     ax1.set_ylabel("Price")
#     plt.tight_layout()
#     plt.show()

### Evaluating Accuracy
How reliably do the identified patterns predict future price movements (based on their designated bullish or bearish nature)?

1. Setting Up the Evaluation Framework: Define a timeframe (in days) post the occurrence of a pattern to observe the subsequent price movement to assess the predictive effectiveness of each pattern.
2. Pattern Recognition Strategy: Employing our pattern_funcs array, we identify and classify patterns, as bullish or bearish.
3. Employing the pattern_funcs array, our script systematically applied TA-Lib functions to detect patterns in the dataset.
4. Quantifying Accuracy: For each pattern identified, we calculated the success rate based on the alignment of price movement with the expected bullish or bearish indication.
    - For bullish patterns, a success was counted if the closing price after the set timeframe was higher than the pattern’s formation price.
    - Conversely, for bearish patterns, a success was noted if the closing price was lower.
5. Handling Data Limitations: One would need to to manage cases where the set timeframe extended beyond the dataset

In [None]:
# If value == 100, the reversal is bullish to bearish. If value == -100, reversal is bearish to bullish

data_processed = pd.DataFrame(columns = df_columns)

timeframe_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,14]     # Analyze the price movement over each of the length of days following pattern identification
avg_returns_list = []                                   # Store avg returns over timeframe following each pattern (pattern_name, avg_returns_following)

data = data.reset_index()
data['pct_chg'] = data['Close'].pct_change() * 100
data = data.dropna()
# print(data['Doji'].value_counts())
# print(data[['Date', 'Doji']].head(50))
# print(data.loc[0, 'Doji'])
# print(data.info())

# Add length of returns to dataframe
for timeframe in timeframe_list:
    col_price = '{} lor'.format(timeframe)
    col_vol = '{} vol'.format(timeframe)
    data[col_price] = data['pct_chg'].rolling(window=timeframe, min_periods=1).sum().shift(- (timeframe))
    data[col_vol] = data['Volume'].rolling(window=timeframe, min_periods=1).mean().shift(- (timeframe))

print(data[['Volume', '7 vol', '7 lor']].head(15))

    # rolling(window=5, min_periods=1): This creates a rolling window of size 5. 
    # The min_periods=1 argument ensures that the calculation is performed even if there are fewer than 5 values available at the beginning of the DataFrame.
    # sum(): This function calculates the sum of the values within each rolling window.
    # shift(-4): This shifts the resulting column 4 rows up, so that each row now contains the sum of the next 5 values (excluding the current row).

def find_p_value(list, timeframe):

    """
    Find p-value given two sets of data

    Inputs:
        - list : list of indices in which pattern occurs : [31, 43, 56, 58, ...]
        - timeframe : the lor that is being evaluated (1,2,3,4,5,6, ...)
    """

    # Get column of lor based on timeframe
    col_price = '{} lor'.format(timeframe)
    col_vol = '{} vol'.format(timeframe)

    # Drop all data that is not = target value
    x_process = data.drop(list)

    # Find x and y for t-test for identifying % returns significance
    x_price = x_process[col_price]
    y_price = data.iloc[list][col_price]  ### THIS IS THE LINE THAT IS GIVING ERROR FOR DOJI

    # Find x and y for t-test for identifying volume significance
    x_vol = x_process[col_vol]
    y_vol = data.iloc[list][col_vol]

    # Break down how to calculate statistical significance based on value associated with candlestick pattern
        # EX1: 100 for bullish reversal would be greater than test for lor after pattern vs any random lor
        # EX2: 100 for continuation would be greater than test for lor after pattern vs any random lor

    """
    *  Currently not specific to value of pattern: simply finding price + volume significance  *

    bearish - bearish reversal pattern 
        100 if true: less than test for lor after pattern vs any random lor
    bullish - bullish reversal pattern 
        100 if true: greater than test for lor after pattern vs any random lor
    cond - potential reversal or continuation (breakout - figure out how to deal with conditional patterns)
        +200 bullish pattern with confirmation: 
        +100 bullish pattern (most cases): 
        -100 bearish pattern: 
        -200 bearish pattern with confirmation: 
    reversal - bull to bear or bear to bull
        100 if bull to bear: 
        -100 if bear to bull: 
    continuation - continuation of trend
        100 bull: 
        -100 if bear: 
    indecision - market doesn't know what to do
        100 if true: 
    """

    # Degrees of freedom (or sample size - 1) cannot be 0
    # Find returns % over lor statistical significance
    if len(y_price) > 1: 
        results_price = pg.ttest(
            x_price, y_price, alternative = 'two-sided', paired=False, correction=True)
        p_price = np.round(results_price.loc[results_price.index[0], 'p-val'],2)
    else:
        p_price = 1

    # Find returns volume over lor statistical significance
    if len(y_vol) > 1:
        results_vol = pg.ttest(
            x_vol, y_vol, alternative = 'two-sided', paired=False, correction=True)
        p_vol = np.round(results_vol.loc[results_vol.index[0], 'p-val'],2)
    else:
        p_vol = 1

    return (p_price, p_vol)


       Volume         7 vol     7 lor
1    40709000  6.435537e+07  2.380168
2    48927000  6.609404e+07  1.488675
3    34838500  6.799326e+07  1.809342
4    52649800  7.265907e+07  2.330751
5    53429100  8.604664e+07  1.993911
6    96389600  7.984670e+07  1.284783
7    81503900  7.109900e+07  1.227986
8    82749700  6.369350e+07  1.073311
9    61097700  6.104080e+07  1.026622
10   48133000  6.126881e+07  0.469045
11   85310500  5.723550e+07  0.302075
12  147142100  4.466537e+07  1.290238
13   52990000  4.819676e+07  0.380241
14   20270000  5.325160e+07  0.758630
15   30911200  5.462091e+07 -0.054874


In [None]:
def get_indices_by_type(pattern_type):

    if (pattern_type == 'bearish') | (pattern_type == 'bullish') | (pattern_type == 'indecision'):              # If pattern is a reversal (bullish or bearish) or indecision, the only acceptable value is 100
        
        pattern_indices_list = data[data[pattern_name] == 100].index                                                 # Store all indices where pattern exists

        pattern_indices = [(100, pattern_indices_list)]
    
    elif (pattern_type == 'reversal'):                                                                          # If pattern is reversal either direction, -100 for bear to bull, 100 for bull to bear

        pattern_indices_bearish_to_bullish = data[data[pattern_name] == -100].index
        pattern_indices_bullish_to_bearish = data[data[pattern_name] == 100].index
        
        pattern_indices = [(-100, pattern_indices_bearish_to_bullish), (100, pattern_indices_bullish_to_bearish)]

    elif (pattern_type == 'continuation'):                                                                      # If pattern is continuation, -100 for bear, 100 for bull

        pattern_indices_bearish = data[data[pattern_name] == -100].index
        pattern_indices_bullish = data[data[pattern_name] == 100].index

        pattern_indices = [(-100, pattern_indices_bearish), (100, pattern_indices_bullish)]

    elif (pattern_type == 'cond'):

        pattern_indices_bearish = data[data[pattern_name] == -100].index
        pattern_indices_bearish_confirmed = data[data[pattern_name] == -200].index
        pattern_indices_bullish = data[data[pattern_name] == 100].index
        pattern_indices_bullish_confirmed = data[data[pattern_name] == 200].index

        pattern_indices = [(-100, pattern_indices_bearish), (-200, pattern_indices_bearish_confirmed), (100, pattern_indices_bullish), (200, pattern_indices_bullish_confirmed)]

    return pattern_indices

avg_returns_list_by_timeframe = []

for timeframe in timeframe_list:

    for pattern_name, pattern_func, pattern_type in pattern_funcs:

        """
        Loop through all candlestick patterns, find average return and volume over given timeframe.

        If there are no occurrences, set values to nan. If 1 occurrence, can't perform t-test due to degrees of freedom - 1 = 0 --> creating divide by 0 case
        """
        
        pattern_indices = get_indices_by_type(pattern_type)

        for list in pattern_indices:

            # Value associated with candle (-200, -100, 100, 200)
            value = list[0]

            # List of indices that contain value (100 : [47, 54, 58, 203, ...])
            list = list[1]

            # Skip if there are no detected patterns of this type
            if len(list) == 0:
                # Reset vars in the case that len is 0
                avg_returns_following = np.nan
                avg_vol_following = np.nan
                avg_returns_dataset = np.round(np.mean(data[col_price]), 4)
                avg_vol_dataset = round(np.mean(data[col_vol]))
                p_price = np.nan
                p_vol = np.nan
                occurrences = 0
            else:
                # Drop most recent data point if it's today, giving problems
                if list[-1] == len(data):
                    list = list[0:len(list)-1]

                if len(list) == 0:
                    # Reset vars in the case that len is 0 AFTER editing list
                    avg_returns_following = np.nan
                    avg_vol_following = np.nan
                    avg_returns_dataset = np.round(np.mean(data[col_price]), 4)
                    avg_vol_dataset = round(np.mean(data[col_vol]))
                    p_price = np.nan
                    p_vol = np.nan
                    occurrences = 0
                    continue

                # Col name for avg price returns over lor
                col_price = '{} lor'.format(timeframe)

                # Col name for avg price returns over lor
                col_vol = '{} vol'.format(timeframe)

                # Find p-value for each value that is in pattern indices (could be 4 for conditional, or 1 for bullish reversal)
                # Pass in list of indices that are associated with candle pattern and value
                p_val = find_p_value(list, timeframe)
                p_price = p_val[0]
                p_vol = p_val[1]

                # Reset with new list being iterated
                occurrences = len(list)

                avg_returns_following = np.round(np.mean(data.iloc[list][col_price]), 4)
                avg_returns_dataset = np.round(np.mean(data.loc[~data.index.isin(list), col_price]), 4)
                avg_vol_following = round(np.mean(data.iloc[list][col_vol]))
                avg_vol_dataset = round(np.mean(data.loc[~data.index.isin(list), col_vol]))

                # New row to be appended to dataframe
            new_row = {'Asset': symbol, 'Time Period' : period, 'Length of Return' : timeframe, 'Candle Pattern' : pattern_name, 
                'Value' : value, 'Avg % Return Following Pattern' : avg_returns_following, 'Avg % Return In General' : avg_returns_dataset,
                'Avg Volume Following Pattern' : avg_vol_following, 'Avg Volume In General' : avg_vol_dataset, 'p-value Returns' : p_price, 
                'p-value Volume' : p_vol, 'Occurrences' : occurrences}
            data_processed.loc[len(data_processed)] = new_row

print(data_processed.head())

  Asset Time Period  Length of Return        Candle Pattern  Value  \
0   SPY          5y                 1             Two Crows    100   
1   SPY          5y                 1     Three Black Crows    100   
2   SPY          5y                 1  Three Inside Up/Down   -100   
3   SPY          5y                 1  Three Inside Up/Down    100   
4   SPY          5y                 1     Three-Line Strike   -100   

   Avg % Return Following Pattern  Avg % Return In General  \
0                             NaN                   0.8339   
1                             NaN                   0.8339   
2                          0.0987                   0.0684   
3                          0.1043                   0.0659   
4                          2.5697                   0.0635   

   Avg Volume Following Pattern  Avg Volume In General  p-value Returns  \
0                           NaN               82110944              NaN   
1                           NaN               82110944  

In [None]:
import csv

def write_csv_header():
    '''
    overwrite csv file with only a header
    '''
    # Open the file in write mode
    with open('candlestick_analysis.csv', 'w', newline='') as file:
        writer = csv.writer(file)
        # Write a header row
        writer.writerow(
            df_columns
        )


def append_to_csv():
    '''
    append data to csv file
    '''
    # # Open the file in write mode
    # with open('candlestick_analysis.csv', 'a', newline='') as file:
    #     writer = csv.writer(file)

    #     rows = [
    #         [
    #             data_processed['Asset'], data_processed['Time Period'], data_processed['Length of Return'], data_processed['Candle Pattern'],
    #             data_processed['Value'], data_processed['Avg % Return'], data_processed['p-value Returns'], data_processed['p-value Volume'],
    #             data_processed['Occurrences']
    #         ]
    #     ]
    #     writer.writerows(rows)

    data_processed.to_csv('candlestick_analysis.csv', index= False)


print(data_processed.head(20))
write_csv_header()
append_to_csv()

   Asset Time Period  Length of Return                  Candle Pattern  Value  \
0    SPY          5y                 1                       Two Crows    100   
1    SPY          5y                 1               Three Black Crows    100   
2    SPY          5y                 1            Three Inside Up/Down   -100   
3    SPY          5y                 1            Three Inside Up/Down    100   
4    SPY          5y                 1               Three-Line Strike   -100   
5    SPY          5y                 1               Three-Line Strike   -200   
6    SPY          5y                 1               Three-Line Strike    100   
7    SPY          5y                 1               Three-Line Strike    200   
8    SPY          5y                 1           Three Outside Up/Down   -100   
9    SPY          5y                 1           Three Outside Up/Down    100   
10   SPY          5y                 1        Three Stars In The South    100   
11   SPY          5y        

### Further Improvements
There are several areas where our candlestick pattern recognition script can be further refined and improved:

Incorporating Machine Learning: Integrating machine learning models can provide a more dynamic approach to identify and classify candlestick patterns more accurately.
Broader Range of Patterns: Expanding the library of candlestick patterns recognized by the script to include less common, yet potentially significant patterns, offering a more comprehensive market analysis tool.
Integration with Other Technical Indicators: Combining candlestick pattern recognition with other technical indicators like moving averages, RSI, and MACD can provide a more holistic view of the trading conditions.