In [1]:
import os
import warnings
warnings.filterwarnings('ignore')

import gc
import linecache
import matplotlib as plt
import mplfinance as mpf
import numpy as np
import pandas as pd
import tracemalloc 

from analysis import detect_and_rename
from tradingpatterns_basic import detect_multiple_tops_bottoms, detect_triangle_pattern, detect_wedge, detect_channel, detect_double_top_bottom

This notebook goes through all the data downloaded locally and runs various scripts to generate images which can then be fed into the CNN

In [2]:
np.random.seed(seed=42)

In [14]:
# Memory leak tracker from python documentation
def display_top(snapshot, key_type='lineno', limit=10):
    snapshot = snapshot.filter_traces((
        tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
        tracemalloc.Filter(False, "<unknown>"),
    ))
    top_stats = snapshot.statistics(key_type)

    print("Top %s lines" % limit)
    for index, stat in enumerate(top_stats[:limit], 1):
        frame = stat.traceback[0]
        print("#%s: %s:%s: %.1f KiB"
              % (index, frame.filename, frame.lineno, stat.size / 1024))
        line = linecache.getline(frame.filename, frame.lineno).strip()
        if line:
            print('    %s' % line)

    other = top_stats[limit:]
    if other:
        size = sum(stat.size for stat in other)
        print("%s other: %.1f KiB" % (len(other), size / 1024))
    total = sum(stat.size for stat in top_stats)
    print("Total allocated size: %.1f KiB" % (total / 1024))

In [15]:
# Read and print the stock tickers that make up S&P500
tickers = pd.read_html(
    'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]

tickers = tickers.Symbol.to_list()

In [16]:
def read_csv(tick: str):
    data = pd.read_csv('data/' + tick + '_data.csv', index_col=None, header=0)
    # Place the datetime in a temp dataset
    temp = data['Datetime']
    data.rename(columns={"open": "Open", "close": "Close", "high": "High",
                         "low": "Low", 'volume': 'Volume'}, inplace=True)
    # Apply numeric and log to the data
    data = data.apply(pd.to_numeric, errors='coerce')
    data = np.log(data)
    # Add the datetime we saved earlier back in
    data['Datetime'] = temp
    data.drop(columns=['adj_close', 'adj_high', 'adj_low'], inplace=True)
    return data

In [17]:
# Plot the chart. Saves it using the id
# Pretty sure there is a memory leak issue with matplotlib, which is 
# Trying to save an image in memory every time
def plot_simple_df(candle_data: pd.DataFrame, stock_type: str, tick: str,  count: int, pad: int = 2):
    if pad < 0:
        pad = 0
    fname = "images/"+ stock_type +"/" + stock_type + '_' + tick + '_' + str(count) + ".png"
    
    candle_data.index = pd.DatetimeIndex(candle_data['Datetime'])
    candle_data.drop(columns=['Datetime'], inplace=True)

    mpf.plot(candle_data, type='candle', axisoff=True, style = 'classic', savefig=dict(fname=fname, dpi=60), closefig=True)

In [18]:
def shuffle_list(ticker_list: list):
    # Shuffling to get different stocks. Will do this after each pattern
    # This makes sure I am pulling from a different list of stocks,
    # But that I will produce the same images each time
    tickers_shuff = tickers
    np.random.shuffle(tickers_shuff)
    return tickers_shuff

In [19]:
def gen_hs(data: pd.DataFrame, tick: str):
    data = detect_and_rename(data, 'wavelet', 3)
    data1 = data[data['head_shoulder_pattern_wavelet'].notnull()]
    loc = 0
    for i in data1.index:
        if i + 15 < len(data) and loc < i and i > 15:
            loc += 100
            plot_simple_df(data.loc[i - 15 : i+15], 'hs', tick, loc, 2)

In [20]:
def gen_mtb(data: pd.DataFrame, tick: str):
    loc = 0
    data = detect_multiple_tops_bottoms(data, window = 10)
    data1 = data[data['multiple_top_bottom_pattern'].notnull()]
    
    for i in data1.index:
        if i + 15 < len(data) and loc < i and i > 15:
            loc += 25
            plot_simple_df(data.loc[i - 15 : i+15], 'mtb', tick, loc, 2)   

In [21]:
def gen_tri(data: pd.DataFrame, tick: str):
    loc = 0
    data = detect_triangle_pattern(data, window=3)
    data1 = data[data['triangle_pattern'].notnull()]
    
    for i in data1.index:
        if i + 15 < len(data) and loc < i and i > 15:
            loc += 125
            plot_simple_df(data.loc[i - 15 : i + 15], 'tri', tick, loc, 2)    

In [22]:
def gen_wedge(data: pd.DataFrame, tick: str):
    loc = 0
    data = detect_wedge(data, window=3)
    data1 = data[data['wedge_pattern'] == 'Wedge Up']
    data2 = data[data['wedge_pattern'] == 'Wedge Down']
    count = 0
    lookback = 20
    # Changing params to change how many patterns are pulled from a single stock
    for i in data1.index:
        if i + lookback < len(data) and loc < i and i > lookback:
            loc += 100
            plot_simple_df(data.loc[i - lookback : i + lookback], 'wed_up', tick, loc, 2)
        if count > 25:
            break
    count = 0
    loc = 0
    for i in data2.index:
        if i + lookback < len(data) and loc < i and i > lookback:
            loc += 100
            plot_simple_df(data.loc[i - lookback : i + lookback], 'wed_down', tick, loc, 2)   
        if count > 25:
            break

In [44]:
def gen_chann(data: pd.DataFrame, tick: str):
    data = detect_channel(data, window=3)
    data1 = data[data['channel_pattern'] == 'Channel Up']
    data2 = data[data['channel_pattern'] == 'Channel Down']
    loc = 0
    count = 0
    lookback = 25
    # Changing params to change how many patterns are pulled from a single stock
    for i in data1.index:
        if i + lookback < len(data) and loc < i and i > lookback:
            loc += 100
            plot_simple_df(data.loc[i - lookback : i + lookback], 'chan_up', tick, loc, 2)
        if count > 25:
            break
    count = 0
    loc = 0
    for i in data2.index:
        if i + lookback < len(data) and loc < i and i > lookback:
            loc += 100
            plot_simple_df(data.loc[i - lookback : i + lookback], 'chan_down', tick, loc, 2)   
        if count > 25:
            break
    return

In [45]:
def gen_double(data: pd.DataFrame, tick: str):
    data = detect_double_top_bottom(data, window=3, threshold=0.1)
    data1 = data[data['double_pattern'] == 'Double Top']
    data2 = data[data['double_pattern'] == 'Double Bottom']
    loc = 0
    count = 0
    lookback = 20
    # Changing params to change how many patterns are pulled from a single stock
    for i in data1.index:
        if i + lookback < len(data) and loc < i and i > lookback:
            loc += 100
            plot_simple_df(data.loc[i - lookback : i + lookback], 'doub_top', tick, loc, 2)
        if count > 25:
            break
    count = 0
    loc = 0
    for i in data2.index:
        if i + lookback < len(data) and loc < i and i > lookback:
            loc += 100
            plot_simple_df(data.loc[i - lookback : i + lookback], 'doub_bot', tick, loc, 2)   
        if count > 25:
            break
    return

In [46]:
# Generate a variety of patterns
def generate_patterns(pat: str, bound1 = 0, bound2 = 2):
    if bound2 < bound1:
        print("Error with bounds")
        return
    if bound1 < 0:
        bound1 = 0
    if bound2 > 500:
        bound2 = bound1 + 2
    for t in tickers[bound1:bound2]:
        try:
            # Read the CSV
            data = read_csv(t)
            
            # Generate the patterns and save to images
            # Accurately detects head and shoulders by using wavelet noise reduction
            if pat == 'wavelet': 
                gen_hs(data, t)
            # This does not generate anything? 
            elif pat == 'mtb':
                gen_mtb(data, t)
            # Only detects ascending and descending triangles
            elif pat == 'tri':
                gen_tri(data, t)
            # Finds ascending and descending wedges
            elif pat == 'wed':
                gen_wedge(data, t)
            # Find ascending and descending wedges
            elif pat == 'chan':
                gen_chann(data, t)
            # Find double tops and double bottoms
            elif pat == 'double':
                gen_double(data, t)
            else:
                print('There is no pattern')
                break
        except Exception as e:
            print(f"Failed on ticker {t} with exception {e}")
            continue

In [ ]:
# tracemalloc.start() 
# generate_patterns('wavelet')

# snapshot = tracemalloc.take_snapshot()
# display_top(snapshot)
# gc.collect()

In [ ]:
# tracemalloc.start() 
# generate_patterns('mtb')

# snapshot = tracemalloc.take_snapshot()
# display_top(snapshot)
# gc.collect()

In [None]:
# tracemalloc.start() 
# generate_patterns('tri')

# snapshot = tracemalloc.take_snapshot()
# display_top(snapshot)
# gc.collect()

In [None]:
# generate_patterns('wed', 351, 450)
# 
# gc.collect()

In [32]:
# generate_patterns('chan', 351, 400)

In [55]:
# generate_patterns('double', 350, 500)

Failed on ticker RVTY with exception [Errno 2] No such file or directory: 'data/RVTY_data.csv'
Failed on ticker SLB with exception [Errno 2] No such file or directory: 'data/SLB_data.csv'


In [56]:
gc.collect()

37417461