This script is meant to find the dates with high peaks, low dips, intercepts between two moving averages, high volume, and gaps.

NOTE: This creates a JSON file!! (not a .csv)

In [None]:
import pandas, json, numpy, requests, os, datetime, pytz, tweepy, sqlite3, time, re, random, matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from scipy.signal import find_peaks

# Goal:
### To obtain the dates where stock price peaks are, when the trend changes, where there is high volume, 
### where there is a large difference in the open and close of a day, and where there are gaps.


####_________________________________________________Definitions___________(See Bottom of Script for Single Example of Agilent Technologies)____________________

def peaks_dips_per_6_months(symbol, company_name, plot='no'):  # Returns two lists of indexes for where the high peaks and low dips are
    price_df = pandas.read_csv(os.getcwd() + "\\Daily Stock Prices\\" + symbol + ' - ' + company_name + '.csv', index_col= 'Date', parse_dates= True)
    price_df = price_df.sort_index(ascending=True)
    
    years_list = list(numpy.arange(min(price_df.index).year, max(price_df.index).year + 1)) # Range of historical price is 20 years

    if plot == 'yes':
        plt.plot(price_df.index, price_df.Close, color='yellow')
        plt.figure(figsize=[15,15])

    high_peaks = []
    low_dips = []

    for x in years_list:

        earliest_day = min(price_df.index).day
        earliest_month = min(price_df.index).month

        if x == 2020:
            first_six_months_df = price_df.loc[(pandas.Timestamp(x, 1, 1) <= price_df.index)]
            last_six_months_df = None

        elif x == min(price_df.index).year and min(price_df.index).month < 6:
            first_six_months_df = price_df.loc[((pandas.Timestamp(x, earliest_month, earliest_day) <= price_df.index) & (pandas.Timestamp(x, 5, 31) >= price_df.index))]
            last_six_months_df = price_df.loc[((pandas.Timestamp(x, 6, 1) <= price_df.index) & (pandas.Timestamp(x, 12, 31) >= price_df.index))] 

        elif x == min(price_df.index).year and min(price_df.index).month > 6:
            first_six_months_df = None
            last_six_months_df = price_df.loc[((pandas.Timestamp(x, earliest_month, earliest_day) <= price_df.index) & (pandas.Timestamp(x, 12, 31) >= price_df.index))] 

        else:    
            first_six_months_df = price_df.loc[((pandas.Timestamp(x, 1, 1) <= price_df.index) & (pandas.Timestamp(x, 5, 31) >= price_df.index))]
            last_six_months_df = price_df.loc[((pandas.Timestamp(x, 6, 1) <= price_df.index) & (pandas.Timestamp(x, 12, 31) >= price_df.index))] 

        # Finding the local mins and maxs:
        # Distance = 60 is assumning to be 60 business days?
        ### For high peaks, minimal is at the average + standard deviation.
        if first_six_months_df is not None:        
            first_high_peaks, peak_properties  = find_peaks(numpy.array(first_six_months_df.Close), height= numpy.average(first_six_months_df.Close) + numpy.std(first_six_months_df.Close), distance= 30)
            temp_list = list(first_six_months_df.index[first_high_peaks])

            if plot == 'yes':
                plt.plot(first_six_months_df.index[first_high_peaks], first_six_months_df['Close'][first_high_peaks], 'o', color= 'red')

            for u in temp_list:
                high_peaks.append(u)

        if last_six_months_df is not None:
            second_high_peaks, peak_properties  = find_peaks(numpy.array(last_six_months_df.Close), height= numpy.average(last_six_months_df.Close) + numpy.std(last_six_months_df.Close), distance= 30)
            temp_list = list(last_six_months_df.index[second_high_peaks])

            if plot == 'yes':
                plt.plot(last_six_months_df.index[second_high_peaks], last_six_months_df['Close'][second_high_peaks], 'o', color= 'red')

            for u in temp_list:
                high_peaks.append(u)

        ### For low dips, the maximum value is the negative value of (average - standard devivation).
        ##### For the find_peaks() function, the parameters for height and first argument are * -1 so that the function will find the highest values.
        ##### When inverted, small/low values are high in value when negative and high values are lower/more negative.
        if first_six_months_df is not None:     
            first_low_dips, peak_properties  = find_peaks(numpy.array(first_six_months_df.Close * -1), height= (numpy.average(first_six_months_df.Close) - numpy.std(first_six_months_df.Close) * -1), distance= 30)
            temp_list = list(first_six_months_df.index[first_low_dips])
            if plot == 'yes':
                plt.plot(first_six_months_df.index[first_low_dips], first_six_months_df['Close'][first_low_dips], 'x', color='blue')

            for u in temp_list:
                low_dips.append(u)

        if last_six_months_df is not None:
            second_low_dips, peak_properties  = find_peaks(numpy.array(last_six_months_df.Close * -1), height= (numpy.average(last_six_months_df.Close) - numpy.std(last_six_months_df.Close)) * -1, distance= 30)
            temp_list = list(last_six_months_df.index[second_low_dips])
            
            if plot == 'yes':
                plt.plot(last_six_months_df.index[second_low_dips], last_six_months_df['Close'][second_low_dips], 'x', color='blue')

            for u in temp_list:
                low_dips.append(u)

        if plot == 'yes':
            if x != 2020:
                plt.vlines(x= pandas.Timestamp(x, earliest_month + 6, earliest_day), ymax= max(first_six_months_df.Close), ymin= min(first_six_months_df.Close), color= 'black')
                plt.hlines(y= numpy.average(first_six_months_df.Close), xmin= min(first_six_months_df.index), xmax= max(first_six_months_df.index), color='black')

                if last_six_months_df is not None:
                    plt.vlines(x= pandas.Timestamp(x + 1, earliest_month, earliest_day), ymax= max(last_six_months_df.Close), ymin= min(last_six_months_df.Close), color= 'black')
                    plt.hlines(y= numpy.average(last_six_months_df.Close), xmin= min(last_six_months_df.index), xmax= max(last_six_months_df.index), color='black')
            
            elif first_six_months_df.empty:
                continue

            else:
                plt.vlines(x= pandas.Timestamp(x, earliest_month + 6, earliest_day), ymax= max(first_six_months_df.Close), ymin= min(first_six_months_df.Close), color= 'black')
                plt.hlines(y= numpy.average(first_six_months_df.Close), xmin= min(first_six_months_df.index), xmax= max(first_six_months_df.index), color='black')    

    if plot == 'yes':
        plt.show()

    return high_peaks, low_dips


#_________________________________________________________________________________________________
#### Examining trend changes:
#### Uptrend = when the price for the next day is higher than the price for the current day
#### Downtrend = when the price for the next day is lower than the price for the current day
#### Will be using simple moving averages (SMA) and exponential moving averages (30 & 60 days) as the trend examiner:
# (https://towardsdatascience.com/implementing-moving-averages-in-python-1ad28e636f9d)

# Returns where the interceptions are of SMA 30 & 60, 4 lists: 2 lists for the day before intercept & 2 lists for the day after intercept
def moving_average(symbol, company_name, plot='no', moving_average_range=[30, 60], ma_type='sma'):
    price_df = pandas.read_csv(os.getcwd() + "\\Daily Stock Prices\\" + symbol + ' - ' + company_name + '.csv', index_col= 'Date', parse_dates= True)
    price_df = price_df.sort_index(ascending=True)

    if ma_type == 'sma': # For SMA
        ma_30 = price_df.Close.rolling(window=moving_average_range[0]).mean()
        ma_60 = price_df.Close.rolling(window=moving_average_range[1]).mean()

        if plot == 'yes':
            plt.plot(price_df.index, price_df.Close, color='yellow')
            plt.plot(price_df.index, ma_30, color='black')
            plt.plot(price_df.index, ma_60, color='blue')


    elif ma_type == 'ema': # For EMA
        ma_30 = price_df.Close.ewm(span=moving_average_range[0], adjust= False).mean()
        ma_60 = price_df.Close.ewm(span=moving_average_range[1], adjust= False).mean()

        if plot == 'yes':
            plt.plot(price_df.index, price_df.Close, color='yellow')
            plt.plot(price_df.index, ema_30, color='black')
            plt.plot(price_df.index, ema_60, color='blue')

    ## Check when sma-30 and sma-60 (or ema-30 and ema-60) cross/intercept to signal buy or sell:
    ### 60 : len(SMA-30) is used because SMA-60 doesn't start until day 60 from earliest data

    # SMA-30 is going to intercept the 60 from above/30 is downtrending into the 60 = sell signal
    ma_30_below_intercept_current = []
    ma_30_below_intercept_next = []
    # SMA-30 is going to intercept the 60 from below/30 is uptrending into the 60 = buy signal
    ma_30_above_intercept_current = []
    ma_30_above_intercept_next = []

    for x in range(60, len(ma_30)):
        try:
            current_day_30 = ma_30[x]
            current_day_60 = ma_60[x]
            previous_day_30 = ma_30[x - 1]
            previous_day_60 = ma_60[x - 1]
            next_day_30 = ma_30[x + 1]
            next_day_60 = ma_60[x + 1]

            #### These two if statements will work to find historical intercepts but not predict new.
            #### To predict new intercepts, use previous_day_30 & _60 to figure direction and slope/likelihood.
            if current_day_30 < current_day_60 and next_day_30 >= next_day_60:
                ma_30_below_intercept_current.append(ma_30.index[x])
                ma_30_below_intercept_next.append(ma_30.index[x+1])

            elif current_day_30 > current_day_60 and next_day_30 <= next_day_60:
                ma_30_above_intercept_current.append(ma_30.index[x])
                ma_30_above_intercept_next.append(ma_30.index[x+1])
                
            if plot == 'yes':
                plt.plot(ma_30_down_intercept_current, price_df.Close.loc[ma_30_down_intercept_current], '|', color= 'black')
                plt.plot(ma_30_up_intercept_current, price_df.Close.loc[ma_30_up_intercept_current], '+', color= 'black')
                plt.show()

        except Exception:
            pass

    return ma_30_below_intercept_current, ma_30_below_intercept_next, ma_30_above_intercept_current, ma_30_above_intercept_next

#______________________________________________________________________________________________
## Find which dates had the highest volume
def highest_volume(symbol, company_name, plot_volume='no', plot_dates='no'):
    price_df = pandas.read_csv(os.getcwd() + "\\Daily Stock Prices\\" + symbol + ' - ' + company_name + '.csv', index_col= 'Date', parse_dates= True)
    price_df = price_df.sort_index(ascending=True)

    if plot_volume == 'yes':
        plt.bar(price_df.index, price_df.Volume, width= 1)
        plt.hlines(numpy.average(price_df.Volume), price_df.index.min(), price_df.index.max())
        plt.hlines(numpy.average(price_df.Volume) + numpy.std(price_df.Volume), price_df.index.min(), price_df.index.max())
        plt.axis([pandas.Timestamp(2004, 1, 1), pandas.Timestamp(2006, 1, 1), numpy.average(price_df.Volume), max(price_df.Volume)])


    ### Highest volume = average + standard deviation
    highest_volume_threshold = numpy.average(price_df.Volume) + numpy.std(price_df.Volume)

    highest_volume_dates = price_df.index[price_df.Volume >= highest_volume_threshold]

    if plot_dates == 'yes':
        plt.plot(price_df.index, price_df.Close, color='yellow')
        plt.plot(highest_volume_dates, price_df.Close.loc[highest_volume_dates], '|', color= 'black')
        plt.axis([pandas.Timestamp(2004, 1, 1), pandas.Timestamp(2006, 1, 1), 0, 50])

    if plot_volume == 'yes' or plot_dates == 'yes':
        plt.show()

    return highest_volume_dates


#______________________________________________________________________________________________
## Find where large gaps are located  <<<<<----------- This is what I am aiming to find to relate to articles
### The difference between the close and the next day open 
def get_gaps(symbol, company_name, plot='no'):
    price_df = pandas.read_csv(os.getcwd() + "\\Daily Stock Prices\\" + symbol + ' - ' + company_name + '.csv', index_col= 'Date', parse_dates= True)
    price_df = price_df.sort_index(ascending=True)

    gap_up_current = []
    gap_up_next = []
    gap_down_current = []
    gap_down_next = []

    for x in range(len(price_df)):
        try:
            current_close = price_df.Close[x]
            next_open = price_df.Open[x + 1]

            if current_close < 5:  ### Set to a >20% difference for smaller priced stocks
                if current_close < next_open and next_open > current_close * 1.20:
                    gap_up_current.append(price_df.index[x])
                    gap_up_next.append(price_df.index[x + 1])

                elif current_close > next_open and current_close * .80 < next_open:
                    gap_down_current.append(price_df.index[x])
                    gap_down_next.append(price_df.index[x + 1])

            else:        ### Set to a >10% difference for larger priced stocks
                if current_close * 1.10 < next_open:
                    gap_up_current.append(price_df.index[x])
                    gap_up_next.append(price_df.index[x + 1])

                elif current_close * .90 > next_open:
                    gap_down_current.append(price_df.index[x])
                    gap_down_next.append(price_df.index[x + 1])
                
        except Exception:
            pass
    
    if plot == 'yes':
        plt.plot(price_df.index, price_df.Close, color='yellow')
        plt.plot(gap_up_current, price_df.Close.loc[gap_up_current], '+', color= 'black')
        plt.plot(gap_down_current, price_df.Close.loc[gap_down_current], '|', color= 'blue')
        plt.show()

    return gap_up_current, gap_up_next, gap_down_current, gap_down_next


## _____________________________________________Combine All Dates & Export to JSON___________________________________________________
# All stocks will have a separate .json file
# JSON is used to maintain the dictionary form. Since the lengths of all the lists are not the same, a DataFrame fit will create many empty cells
### Creating a definition with definitions:

def patterns_combine_export(symbol, company_name, plot='no', moving_average_range=[30, 60], ma_type='sma', plot_volume='no', plot_dates='no'):
    high_peaks, low_dips = peaks_dips_per_6_months(symbol, company_name, plot)

    ### Oddly, the current and next returns are switched around.
    ma_30_below_intercept_current, ma_30_below_intercept_next, ma_30_above_intercept_current, ma_30_above_intercept_next = moving_average(symbol, company_name, plot=plot, moving_average_range=moving_average_range, ma_type=ma_type)

    highest_volume_dates = list(highest_volume(symbol, company_name, plot_volume=plot_volume, plot_dates=plot_dates))

    gap_up_current, gap_up_next, gap_down_current, gap_down_next = get_gaps(symbol, company_name, plot=plot)

    # Converting all timestamp objects into sting to be storage-able in JSON: ('2020-01-01')
    for x, y in enumerate(high_peaks):
        high_peaks[x] = str(y.date())
    
    for x, y in enumerate(low_dips):
        low_dips[x] = str(y.date())

    for x, y in enumerate(ma_30_below_intercept_current):
        ma_30_below_intercept_current[x] = str(y.date())

    for x, y in enumerate(ma_30_below_intercept_next):
        ma_30_below_intercept_next[x] = str(y.date())

    for x, y in enumerate(ma_30_above_intercept_current):
        ma_30_above_intercept_current[x] = str(y.date())

    for x, y in enumerate(ma_30_above_intercept_next):
        ma_30_above_intercept_next[x] = str(y.date())

    for x, y in enumerate(highest_volume_dates):
        highest_volume_dates[x] = str(y.date())

    for x, y in enumerate(gap_up_current):
        gap_up_current[x] = str(y.date())
        
    for x, y in enumerate(gap_up_next):
        gap_up_next[x] = str(y.date())

    for x, y in enumerate(gap_down_current):
        gap_down_current[x] = str(y.date())

    for x, y in enumerate(gap_down_next):
        gap_down_next[x] = str(y.date())

    temp_dict = {'Symbol' : symbol, 'Name' : company_name, 'High_Peaks_Dates' : high_peaks, 'Low_Dips_Dates' : low_dips, "Moving_Average_30_Down_Before_Intercept_Dates" : ma_30_below_intercept_current, "Moving_Average_30_Down_After_Intercept_Dates" :  ma_30_below_intercept_next, "Moving_Average_30_Up_Before_Intercept_Dates" :  ma_30_above_intercept_current,  "Moving_Average_30_Up_After_Intercept_Dates" : ma_30_above_intercept_next, "Highest_Volume_Dates" : highest_volume_dates, "Gap_Up_Before_Dates": gap_up_current, "Gap_Up_After_Dates": gap_up_next, "Gap_Down_Before_Dates": gap_down_current, "Gap_Down_After_Dates": gap_down_next}

    if not os.path.exists(os.getcwd() + '\\Patterns\\'):
        os.makedirs(os.getcwd() + '\\Patterns\\')

    filepath = os.getcwd() + '\\Patterns\\'

    with open(filepath + symbol + ' - ' + company_name + ' - Patterns.json', 'w') as file_opener:
        json.dump(temp_dict, file_opener)


#### =================================================Looping Through Stocks=============================================================
## Since the price history of stocks are the foundation of the project, only the stocks being used in Alpha_Vantage_API_Price_Daily will be used.
## The first part from that script is copied below:

NYSE_csv = pandas.read_csv('NYSE.txt', sep="\t", header=0).set_index('Symbol')

AMEX_csv = pandas.read_csv('AMEX.txt', sep="\t", header=0).set_index('Symbol')

stock_exchange_ticks_and_names = pandas.merge(NYSE_csv.reset_index(), AMEX_csv.reset_index(), how='outer')
stock_exchange_ticks_and_names.to_csv('merged_NYSE_AMEX.csv')
stock_exchange_ticks_and_names_copy = stock_exchange_ticks_and_names.copy().dropna()

regex1 = re.compile('[@_!#$%^&*()<>?/\|}{~:[\].]')
regex2 = re.compile('Cl ')

stock_exchange_ticks_and_names_removed = pandas.DataFrame()

for x, y in stock_exchange_ticks_and_names_copy.iterrows():

    if bool(regex1.search(y['Description'])) == False and bool(regex2.search(y['Description'])) == False and bool(regex1.search(y['Symbol'])) == False:
        stock_exchange_ticks_and_names_removed.loc[x, 'Symbol'] = y['Symbol']
        stock_exchange_ticks_and_names_removed.loc[x, 'Description'] = y['Description']

stock_indices_df = pandas.DataFrame({'Description': ['S&P', 'Dow', "Nasdaq"], 'Symbol': ['.INX', '.DJI', ".IXIC"]})

stocks_and_names_with_indices = pandas.concat([stock_exchange_ticks_and_names_removed, stock_indices_df])
stocks_and_names_with_indices = stocks_and_names_with_indices.set_index('Symbol')
stocks_and_names_with_indices = stocks_and_names_with_indices.reset_index()
stocks_and_names_with_indices.to_csv('merged_NYSE_AMEX_removed_patterns.csv')

### Looping begins:
for x, y in stocks_and_names_with_indices.iterrows():
    symbol = y.Symbol
    name = y.Description
    if os.path.exists(os.getcwd() + "\\Daily Stock Prices\\" + symbol + ' - ' + name + '.csv') == True:
        try:
            patterns_combine_export(symbol= symbol, company_name= name)
            print(symbol)
        except Exception:
            pass